Skip to main content

omnigraph_server/
api.rs

1use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot};
2use omnigraph::error::{MergeConflict, MergeConflictKind};
3use omnigraph::loader::{IngestResult, LoadMode};
4use omnigraph_compiler::SchemaMigrationStep;
5use omnigraph_compiler::result::QueryResult;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use utoipa::{IntoParams, ToSchema};
9
10/// Shadow enum for documenting [`LoadMode`] in the OpenAPI schema.
11#[derive(ToSchema)]
12#[schema(as = LoadMode)]
13#[allow(dead_code)]
14enum LoadModeSchema {
15    /// Overwrite existing data.
16    #[schema(rename = "overwrite")]
17    Overwrite,
18    /// Append to existing data.
19    #[schema(rename = "append")]
20    Append,
21    /// Merge by id key (upsert).
22    #[schema(rename = "merge")]
23    Merge,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
27pub struct SnapshotTableOutput {
28    pub table_key: String,
29    pub table_path: String,
30    pub table_version: u64,
31    pub table_branch: Option<String>,
32    pub row_count: u64,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
36pub struct SnapshotOutput {
37    pub branch: String,
38    pub manifest_version: u64,
39    pub tables: Vec<SnapshotTableOutput>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
43pub struct BranchCreateRequest {
44    /// Parent branch to fork from. Defaults to `main`.
45    pub from: Option<String>,
46    /// Name of the new branch. Must not already exist.
47    pub name: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
51pub struct BranchCreateOutput {
52    pub uri: String,
53    pub from: String,
54    pub name: String,
55    pub actor_id: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
59pub struct BranchListOutput {
60    pub branches: Vec<String>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
64pub struct BranchDeleteOutput {
65    pub uri: String,
66    pub name: String,
67    pub actor_id: Option<String>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
71pub struct BranchMergeRequest {
72    /// Source branch whose commits will be merged.
73    pub source: String,
74    /// Target branch that will receive the merge. Defaults to `main`.
75    pub target: Option<String>,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
79#[serde(rename_all = "snake_case")]
80pub enum BranchMergeOutcome {
81    AlreadyUpToDate,
82    FastForward,
83    Merged,
84}
85
86impl From<MergeOutcome> for BranchMergeOutcome {
87    fn from(value: MergeOutcome) -> Self {
88        match value {
89            MergeOutcome::AlreadyUpToDate => Self::AlreadyUpToDate,
90            MergeOutcome::FastForward => Self::FastForward,
91            MergeOutcome::Merged => Self::Merged,
92        }
93    }
94}
95
96impl BranchMergeOutcome {
97    pub fn as_str(self) -> &'static str {
98        match self {
99            Self::AlreadyUpToDate => "already_up_to_date",
100            Self::FastForward => "fast_forward",
101            Self::Merged => "merged",
102        }
103    }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
107pub struct BranchMergeOutput {
108    pub source: String,
109    pub target: String,
110    pub outcome: BranchMergeOutcome,
111    pub actor_id: Option<String>,
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
115#[serde(rename_all = "snake_case")]
116pub enum MergeConflictKindOutput {
117    DivergentInsert,
118    DivergentUpdate,
119    DeleteVsUpdate,
120    OrphanEdge,
121    UniqueViolation,
122    CardinalityViolation,
123    ValueConstraintViolation,
124}
125
126impl MergeConflictKindOutput {
127    pub fn as_str(self) -> &'static str {
128        match self {
129            Self::DivergentInsert => "divergent_insert",
130            Self::DivergentUpdate => "divergent_update",
131            Self::DeleteVsUpdate => "delete_vs_update",
132            Self::OrphanEdge => "orphan_edge",
133            Self::UniqueViolation => "unique_violation",
134            Self::CardinalityViolation => "cardinality_violation",
135            Self::ValueConstraintViolation => "value_constraint_violation",
136        }
137    }
138}
139
140impl From<MergeConflictKind> for MergeConflictKindOutput {
141    fn from(value: MergeConflictKind) -> Self {
142        match value {
143            MergeConflictKind::DivergentInsert => Self::DivergentInsert,
144            MergeConflictKind::DivergentUpdate => Self::DivergentUpdate,
145            MergeConflictKind::DeleteVsUpdate => Self::DeleteVsUpdate,
146            MergeConflictKind::OrphanEdge => Self::OrphanEdge,
147            MergeConflictKind::UniqueViolation => Self::UniqueViolation,
148            MergeConflictKind::CardinalityViolation => Self::CardinalityViolation,
149            MergeConflictKind::ValueConstraintViolation => Self::ValueConstraintViolation,
150        }
151    }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
155pub struct MergeConflictOutput {
156    pub table_key: String,
157    pub row_id: Option<String>,
158    pub kind: MergeConflictKindOutput,
159    pub message: String,
160}
161
162impl From<&MergeConflict> for MergeConflictOutput {
163    fn from(value: &MergeConflict) -> Self {
164        Self {
165            table_key: value.table_key.clone(),
166            row_id: value.row_id.clone(),
167            kind: value.kind.into(),
168            message: value.message.clone(),
169        }
170    }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
174pub struct ReadTargetOutput {
175    pub branch: Option<String>,
176    pub snapshot: Option<String>,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
180pub struct ReadOutput {
181    pub query_name: String,
182    pub target: ReadTargetOutput,
183    pub row_count: usize,
184    #[serde(default, skip_serializing_if = "Vec::is_empty")]
185    pub columns: Vec<String>,
186    pub rows: Value,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
190pub struct ChangeOutput {
191    pub branch: String,
192    pub query_name: String,
193    pub affected_nodes: usize,
194    pub affected_edges: usize,
195    pub actor_id: Option<String>,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
199pub struct IngestTableOutput {
200    pub table_key: String,
201    pub rows_loaded: usize,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
205pub struct IngestOutput {
206    pub uri: String,
207    pub branch: String,
208    pub base_branch: String,
209    pub branch_created: bool,
210    #[schema(value_type = LoadModeSchema)]
211    pub mode: LoadMode,
212    pub tables: Vec<IngestTableOutput>,
213    pub actor_id: Option<String>,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
217pub struct CommitOutput {
218    pub graph_commit_id: String,
219    pub manifest_branch: Option<String>,
220    pub manifest_version: u64,
221    pub parent_commit_id: Option<String>,
222    pub merged_parent_commit_id: Option<String>,
223    pub actor_id: Option<String>,
224    /// Commit creation time as Unix epoch microseconds.
225    #[schema(example = 1714000000000000i64)]
226    pub created_at: i64,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
230pub struct CommitListOutput {
231    pub commits: Vec<CommitOutput>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
235pub struct ReadRequest {
236    /// GQ query source. May declare one or more named queries; pick one with
237    /// `query_name` if there is more than one.
238    #[schema(
239        example = "query get_person($name: String) {\n    match {\n        $p: Person { name: $name }\n    }\n    return { $p.name, $p.age }\n}"
240    )]
241    pub query_source: String,
242    /// Name of the query to run when `query_source` declares multiple. Optional
243    /// when only one query is declared.
244    pub query_name: Option<String>,
245    /// JSON object whose keys match the query's declared parameters.
246    pub params: Option<Value>,
247    /// Branch to read from. Mutually exclusive with `snapshot`. Defaults to `main`.
248    pub branch: Option<String>,
249    /// Snapshot id to read from. Mutually exclusive with `branch`.
250    pub snapshot: Option<String>,
251}
252
253/// Inline read-query request for `POST /query`.
254///
255/// Friendlier-named alternative to [`ReadRequest`] for ad-hoc reads and
256/// AI-agent integration. Mutations are rejected with 400 — use `POST
257/// /mutate` (or its deprecated alias `POST /change`) for write queries.
258/// Field names are deliberately short (`query`, `name`) to match the GQ
259/// keyword and the CLI `-e` flag.
260#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
261pub struct QueryRequest {
262    /// GQ read-query source. May declare one or more named queries; pick one
263    /// with `name` when more than one is declared. Mutations
264    /// (`insert`/`update`/`delete`) get 400 — use `POST /mutate` (or its
265    /// deprecated alias `POST /change`) instead.
266    #[schema(example = "query get_person($name: String) {\n    match {\n        $p: Person { name: $name }\n    }\n    return { $p.name, $p.age }\n}")]
267    pub query: String,
268    /// Name of the query to run when `query` declares multiple. Optional when
269    /// only one query is declared.
270    pub name: Option<String>,
271    /// JSON object whose keys match the query's declared parameters.
272    pub params: Option<Value>,
273    /// Branch to read from. Mutually exclusive with `snapshot`. Defaults to `main`.
274    pub branch: Option<String>,
275    /// Snapshot id to read from. Mutually exclusive with `branch`.
276    pub snapshot: Option<String>,
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
280pub struct ChangeRequest {
281    /// GQ mutation source containing `insert`, `update`, or `delete` statements.
282    /// May declare multiple named mutations; pick one with `name`.
283    ///
284    /// Accepts the legacy field name `query_source` as a deserialization alias.
285    #[schema(
286        example = "query insert_person($name: String, $age: I32) {\n    insert Person { name: $name, age: $age }\n}"
287    )]
288    #[serde(alias = "query_source")]
289    pub query: String,
290    /// Name of the mutation to run when `query` declares multiple.
291    ///
292    /// Accepts the legacy field name `query_name` as a deserialization alias.
293    #[serde(default, alias = "query_name")]
294    pub name: Option<String>,
295    /// JSON object whose keys match the mutation's declared parameters.
296    #[serde(default)]
297    pub params: Option<Value>,
298    /// Target branch. Defaults to `main`.
299    #[serde(default)]
300    pub branch: Option<String>,
301}
302
303#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
304pub struct SchemaApplyRequest {
305    /// Project schema in `.pg` source form. The diff against the current
306    /// schema produces the migration steps that will be applied.
307    #[schema(
308        example = "node Person {\n    name: String @key\n    age: I32?\n}\n\nedge Knows: Person -> Person"
309    )]
310    pub schema_source: String,
311    /// When true, promote every `DropMode::Soft` step in the plan to
312    /// `DropMode::Hard`, making the prior column data unreachable
313    /// after the apply. Matches the CLI's `--allow-data-loss` flag.
314    /// Defaults to `false` (drops remain reversible via time travel).
315    #[serde(default)]
316    pub allow_data_loss: bool,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
320pub struct SchemaApplyOutput {
321    pub uri: String,
322    pub supported: bool,
323    pub applied: bool,
324    pub step_count: usize,
325    pub manifest_version: u64,
326    #[schema(value_type = Vec<Value>)]
327    pub steps: Vec<SchemaMigrationStep>,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
331pub struct SchemaOutput {
332    pub schema_source: String,
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
336pub struct IngestRequest {
337    /// Target branch. Created from `from` if it does not yet exist. Defaults to `main`.
338    pub branch: Option<String>,
339    /// Parent branch used to create `branch` if it does not exist. Defaults to `main`.
340    pub from: Option<String>,
341    /// How existing rows are handled. Defaults to `merge`.
342    #[schema(value_type = Option<LoadModeSchema>)]
343    pub mode: Option<LoadMode>,
344    /// NDJSON payload: one record per line, each shaped
345    /// `{"type": "<TypeName>", "data": {...}}`.
346    #[schema(
347        example = "{\"type\": \"Person\", \"data\": {\"name\": \"Alice\", \"age\": 30}}\n{\"type\": \"Person\", \"data\": {\"name\": \"Bob\", \"age\": 25}}"
348    )]
349    pub data: String,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
353pub struct ExportRequest {
354    /// Branch to export. Defaults to `main`.
355    pub branch: Option<String>,
356    /// Restrict the export to these node/edge type names. Empty exports all types.
357    #[serde(default)]
358    pub type_names: Vec<String>,
359    /// Restrict the export to these table keys. Empty exports all tables.
360    #[serde(default)]
361    pub table_keys: Vec<String>,
362}
363
364#[derive(Debug, Clone, Deserialize, IntoParams)]
365pub struct SnapshotQuery {
366    pub branch: Option<String>,
367}
368
369#[derive(Debug, Clone, Deserialize, IntoParams)]
370pub struct CommitListQuery {
371    pub branch: Option<String>,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
375pub struct HealthOutput {
376    pub status: String,
377    pub version: String,
378    #[serde(skip_serializing_if = "Option::is_none")]
379    pub source_version: Option<String>,
380}
381
382#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
383#[serde(rename_all = "snake_case")]
384pub enum ErrorCode {
385    Unauthorized,
386    Forbidden,
387    BadRequest,
388    NotFound,
389    /// 405 Method Not Allowed — the route exists but the active server
390    /// mode doesn't serve this method (e.g. `GET /graphs` in single-graph
391    /// mode). Distinct from 404 so clients can tell "wrong context" from
392    /// "no such resource."
393    MethodNotAllowed,
394    Conflict,
395    /// 429 Too Many Requests — per-actor admission cap exceeded.
396    /// Clients should respect the `Retry-After` header.
397    TooManyRequests,
398    Internal,
399}
400
401/// Structured details for a publisher-level OCC failure. Surfaces alongside
402/// HTTP 409 when a write was rejected because the caller's pre-write view of
403/// one table's manifest version was stale relative to the current head. The
404/// expected/actual fields tell the client which table to refresh.
405#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
406pub struct ManifestConflictOutput {
407    pub table_key: String,
408    pub expected: u64,
409    pub actual: u64,
410}
411
412#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
413pub struct ErrorOutput {
414    pub error: String,
415    #[serde(skip_serializing_if = "Option::is_none")]
416    pub code: Option<ErrorCode>,
417    #[serde(default, skip_serializing_if = "Vec::is_empty")]
418    pub merge_conflicts: Vec<MergeConflictOutput>,
419    /// Set when the conflict is a publisher CAS rejection
420    /// (`ManifestConflictDetails::ExpectedVersionMismatch`). The caller's
421    /// pre-write view of `table_key` was at version `expected` but the
422    /// manifest is now at `actual`. Refresh and retry.
423    #[serde(skip_serializing_if = "Option::is_none")]
424    pub manifest_conflict: Option<ManifestConflictOutput>,
425}
426
427pub fn snapshot_payload(branch: &str, snapshot: &Snapshot) -> SnapshotOutput {
428    let mut entries: Vec<_> = snapshot.entries().cloned().collect();
429    entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
430    let tables = entries
431        .iter()
432        .map(|entry| SnapshotTableOutput {
433            table_key: entry.table_key.clone(),
434            table_path: entry.table_path.clone(),
435            table_version: entry.table_version,
436            table_branch: entry.table_branch.clone(),
437            row_count: entry.row_count,
438        })
439        .collect::<Vec<_>>();
440    SnapshotOutput {
441        branch: branch.to_string(),
442        manifest_version: snapshot.version(),
443        tables,
444    }
445}
446
447pub fn schema_apply_output(uri: &str, result: SchemaApplyResult) -> SchemaApplyOutput {
448    SchemaApplyOutput {
449        uri: uri.to_string(),
450        supported: result.supported,
451        applied: result.applied,
452        step_count: result.steps.len(),
453        manifest_version: result.manifest_version,
454        steps: result.steps,
455    }
456}
457
458pub fn commit_output(commit: &GraphCommit) -> CommitOutput {
459    CommitOutput {
460        graph_commit_id: commit.graph_commit_id.clone(),
461        manifest_branch: commit.manifest_branch.clone(),
462        manifest_version: commit.manifest_version,
463        parent_commit_id: commit.parent_commit_id.clone(),
464        merged_parent_commit_id: commit.merged_parent_commit_id.clone(),
465        actor_id: commit.actor_id.clone(),
466        created_at: commit.created_at,
467    }
468}
469
470pub fn read_output(query_name: String, target: &ReadTarget, result: QueryResult) -> ReadOutput {
471    let columns = result
472        .schema()
473        .fields()
474        .iter()
475        .map(|field| field.name().clone())
476        .collect();
477    ReadOutput {
478        query_name,
479        target: read_target_output(target),
480        row_count: result.num_rows(),
481        columns,
482        rows: result.to_rust_json(),
483    }
484}
485
486pub fn ingest_output(uri: &str, result: &IngestResult, actor_id: Option<String>) -> IngestOutput {
487    IngestOutput {
488        uri: uri.to_string(),
489        branch: result.branch.clone(),
490        base_branch: result.base_branch.clone(),
491        branch_created: result.branch_created,
492        mode: result.mode,
493        tables: result
494            .tables
495            .iter()
496            .map(|table| IngestTableOutput {
497                table_key: table.table_key.clone(),
498                rows_loaded: table.rows_loaded,
499            })
500            .collect(),
501        actor_id,
502    }
503}
504
505pub fn read_target_output(target: &ReadTarget) -> ReadTargetOutput {
506    match target {
507        ReadTarget::Branch(branch) => ReadTargetOutput {
508            branch: Some(branch.clone()),
509            snapshot: None,
510        },
511        ReadTarget::Snapshot(snapshot) => ReadTargetOutput {
512            branch: None,
513            snapshot: Some(snapshot.as_str().to_string()),
514        },
515    }
516}
517
518// ─── MR-668 — management endpoint shapes ──────────────────────────────────
519
520/// One entry in the response from `GET /graphs`. Cluster operators
521/// consume this list to discover which graphs the server is currently
522/// serving. The shape is intentionally minimal — `graph_id` and `uri`
523/// are the only fields a routing client needs.
524#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
525pub struct GraphInfo {
526    pub graph_id: String,
527    pub uri: String,
528}
529
530/// Response from `GET /graphs`. Lists every graph registered with the
531/// server in alphabetical order by `graph_id` (sorted server-side so
532/// clients get deterministic output across requests).
533#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
534pub struct GraphListResponse {
535    pub graphs: Vec<GraphInfo>,
536}