drasi-source-platform 0.1.14

Platform source plugin for Drasi
Documentation
// Copyright 2025 The Drasi Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
 * Platform CloudEvent Format Specification
 *
 * This format is used by the Platform source to consume events from Redis Streams.
 * Events are wrapped in CloudEvent format and contain either data changes or control
 * messages for managing query subscriptions.
 *
 * Source: server-core/src/sources/platform/mod.rs
 */

import "@typespec/json-schema";
import "../../../../typespec/core-types.tsp";

using TypeSpec.JsonSchema;

namespace Drasi.Sources.Platform;

/**
 * CloudEvent wrapper for platform events.
 *
 * All events from the platform source are wrapped in CloudEvent format.
 * The CloudEvent specification is from https://cloudevents.io/
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 692-829)
 */
@jsonSchema
model CloudEvent {
  /** CloudEvent specification version */
  specversion?: string = "1.0";

  /** Event type identifier */
  type?: string;

  /** Event source identifier */
  source?: string;

  /** Unique event ID */
  id?: string;

  /** Event timestamp */
  time?: string;

  /** Content type of data */
  datacontenttype?: string = "application/json";

  /** Event data - array of PlatformEvent */
  data: PlatformEvent[];
}

/**
 * A platform event containing either data changes or control messages.
 *
 * Events are discriminated by the payload.source.db field:
 * - If db = "Drasi" (case-insensitive): Control message
 * - Otherwise: Data message
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 645-682)
 */
@jsonSchema
model PlatformEvent {
  /** Operation type: "i" (insert), "u" (update), "d" (delete) */
  op: PlatformOperation;

  /** Event payload containing source metadata and element data */
  payload: PlatformPayload;

  /** Optional reactivator start timestamp in nanoseconds */
  reactivatorStart_ns?: uint64;

  /** Optional reactivator end timestamp in nanoseconds */
  reactivatorEnd_ns?: uint64;
}

/**
 * Platform operation types.
 *
 * Based on CDC operation codes.
 */
@jsonSchema
enum PlatformOperation {
  /** Insert operation */
  insert: "i",

  /** Update operation */
  update: "u",

  /** Delete operation */
  delete: "d"
}

/**
 * Payload of a platform event.
 *
 * Contains the before/after state of the element and source metadata.
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 716-760)
 */
@jsonSchema
model PlatformPayload {
  /** State before the change (for update and delete) */
  before?: PlatformElementData;

  /** State after the change (for insert and update) */
  after?: PlatformElementData;

  /** Source metadata */
  source: PlatformSource;
}

/**
 * Source metadata for platform events.
 *
 * Identifies the source and timing of the event.
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 666-677)
 */
@jsonSchema
model PlatformSource {
  /** Database/namespace identifier */
  db: string;

  /** Table/collection name - also serves as element type ("node", "rel") or control type */
  table: string;

  /** Timestamp in nanoseconds when the change occurred */
  ts_ns: uint64;
}

/**
 * Element data in platform format.
 *
 * Represents a node or relation with its properties.
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 740-810)
 */
@jsonSchema
model PlatformElementData {
  /** Element ID */
  id: string;

  /** Element labels/types */
  labels: string[];

  /** Element properties (JSON object) */
  @extension("x-typespec-name", "properties")
  properties: {};

  /** For relations: ID of the source node */
  startId?: string;

  /** For relations: ID of the target node */
  endId?: string;
}

/**
 * Control message for source subscription management.
 *
 * Control messages use the same CloudEvent wrapper but with db="Drasi" and
 * table="SourceSubscription".
 *
 * Based on: server-core/src/sources/platform/mod.rs (lines 831-941)
 */
@jsonSchema
model SourceSubscriptionData {
  /** Query ID that this subscription belongs to */
  queryId: string;

  /** Query node ID (unique identifier for this query instance) */
  queryNodeId: string;

  /** Node labels to subscribe to */
  nodeLabels?: string[];

  /** Relation labels to subscribe to */
  relLabels?: string[];
}

/**
 * Control operation types.
 *
 * Maps to platform operations for control messages.
 */
@jsonSchema
enum ControlOperation {
  /** Add a subscription */
  insert: "i",

  /** Update a subscription */
  update: "u",

  /** Remove a subscription */
  delete: "d"
}

/**
 * Example usage:
 *
 * Data Event - Insert Node:
 * ```json
 * {
 *   "specversion": "1.0",
 *   "type": "drasi.change",
 *   "source": "postgres-source",
 *   "id": "event-123",
 *   "time": "2025-01-01T00:00:00Z",
 *   "datacontenttype": "application/json",
 *   "data": [
 *     {
 *       "op": "i",
 *       "payload": {
 *         "after": {
 *           "id": "user_123",
 *           "labels": ["User", "Person"],
 *           "properties": {
 *             "name": "John Doe",
 *             "email": "john@example.com",
 *             "age": 30
 *           }
 *         },
 *         "source": {
 *           "db": "myapp",
 *           "table": "node",
 *           "ts_ns": 1704067200000000000
 *         }
 *       },
 *       "reactivatorStart_ns": 1704067200000000000,
 *       "reactivatorEnd_ns": 1704067200100000000
 *     }
 *   ]
 * }
 * ```
 *
 * Data Event - Update Relation:
 * ```json
 * {
 *   "data": [
 *     {
 *       "op": "u",
 *       "payload": {
 *         "before": {
 *           "id": "follows_1",
 *           "labels": ["FOLLOWS"],
 *           "startId": "user_123",
 *           "endId": "user_456",
 *           "properties": {
 *             "since": "2024-01-01"
 *           }
 *         },
 *         "after": {
 *           "id": "follows_1",
 *           "labels": ["FOLLOWS"],
 *           "startId": "user_123",
 *           "endId": "user_456",
 *           "properties": {
 *             "since": "2024-01-01",
 *             "weight": 5
 *           }
 *         },
 *         "source": {
 *           "db": "myapp",
 *           "table": "rel",
 *           "ts_ns": 1704067200000000000
 *         }
 *       }
 *     }
 *   ]
 * }
 * ```
 *
 * Data Event - Delete:
 * ```json
 * {
 *   "data": [
 *     {
 *       "op": "d",
 *       "payload": {
 *         "before": {
 *           "id": "user_123",
 *           "labels": ["User"]
 *         },
 *         "source": {
 *           "db": "myapp",
 *           "table": "node",
 *           "ts_ns": 1704067200000000000
 *         }
 *       }
 *     }
 *   ]
 * }
 * ```
 *
 * Control Event - Add Subscription:
 * ```json
 * {
 *   "data": [
 *     {
 *       "op": "i",
 *       "payload": {
 *         "after": {
 *           "queryId": "my-query",
 *           "queryNodeId": "my-query-instance-1",
 *           "nodeLabels": ["User", "Product"],
 *           "relLabels": ["FOLLOWS", "PURCHASED"]
 *         },
 *         "source": {
 *           "db": "Drasi",
 *           "table": "SourceSubscription",
 *           "ts_ns": 1704067200000000000
 *         }
 *       }
 *     }
 *   ]
 * }
 * ```
 *
 * Control Event - Remove Subscription:
 * ```json
 * {
 *   "data": [
 *     {
 *       "op": "d",
 *       "payload": {
 *         "before": {
 *           "queryId": "my-query",
 *           "queryNodeId": "my-query-instance-1"
 *         },
 *         "source": {
 *           "db": "Drasi",
 *           "table": "SourceSubscription",
 *           "ts_ns": 1704067200000000000
 *         }
 *       }
 *     }
 *   ]
 * }
 * ```
 */