uni-common 2.0.5

Common types, identity encoding, and schema for Uni graph database
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

use std::path::PathBuf;
use thiserror::Error;

#[derive(Debug, Error)]
#[non_exhaustive]
pub enum UniError {
    #[error("Database not found: {path}")]
    NotFound { path: PathBuf },

    #[error("Schema error: {message}")]
    Schema { message: String },

    #[error("Parse error: {message}")]
    Parse {
        message: String,
        position: Option<usize>,
        line: Option<usize>,
        column: Option<usize>,
        context: Option<String>,
    },

    #[error("Query error: {message}")]
    Query {
        message: String,
        query: Option<String>,
    },

    #[error("Transaction error: {message}")]
    Transaction { message: String },

    #[error("Transaction conflict: {message}")]
    TransactionConflict { message: String },

    #[error("Transaction already completed")]
    TransactionAlreadyCompleted,

    /// Operation not supported on read-only database
    #[error("Operation '{operation}' not supported on read-only database")]
    ReadOnly { operation: String },

    /// Label not found in schema
    #[error("Label '{label}' not found in schema")]
    LabelNotFound { label: String },

    /// Edge type not found in schema
    #[error("Edge type '{edge_type}' not found in schema")]
    EdgeTypeNotFound { edge_type: String },

    /// Property not found on node/edge
    #[error("Property '{property}' not found on {entity_type} with label '{label}'")]
    PropertyNotFound {
        property: String,
        entity_type: String, // "node" or "edge"
        label: String,
    },

    /// Index not found
    #[error("Index '{index}' not found")]
    IndexNotFound { index: String },

    /// Snapshot not found
    #[error("Snapshot '{snapshot_id}' not found")]
    SnapshotNotFound { snapshot_id: String },

    /// Query memory limit exceeded
    #[error("Query exceeded memory limit of {limit_bytes} bytes")]
    MemoryLimitExceeded { limit_bytes: usize },

    #[error("Database is locked by another process")]
    DatabaseLocked,

    #[error("Operation timed out after {timeout_ms}ms")]
    Timeout { timeout_ms: u64 },

    /// A Locy program stopped before reaching its least fixed point because it
    /// exceeded its wall-clock `timeout` or its `max_iterations` cap.
    ///
    /// This is the default outcome of an over-budget evaluation: partial results
    /// are *not* returned silently. The boxed [`LocyIncomplete`] carries the
    /// diagnostics (which rules were skipped, which complement rules are now
    /// unsound, how far evaluation got). The partial facts themselves are not
    /// embedded here — to recover them, re-run with `allow_partial` set, which
    /// returns `Ok` with the partial result instead of this error.
    #[error("Locy evaluation incomplete: {detail}")]
    LocyIncomplete { detail: Box<LocyIncomplete> },

    #[error("Type error: expected {expected}, got {actual}")]
    Type { expected: String, actual: String },

    #[error("Constraint violation: {message}")]
    Constraint { message: String },

    /// A transaction was aborted at commit because a concurrent transaction
    /// committed a conflicting write since this transaction began (optimistic
    /// concurrency control). The transaction may be safely retried.
    #[error("Serialization conflict: {message}")]
    SerializationConflict { message: String },

    /// A transaction was aborted at commit because a concurrent transaction
    /// committed a row with the same unique key (serializable MERGE). The
    /// transaction may be safely retried, which will observe the existing row.
    #[error("Constraint conflict: {message}")]
    ConstraintConflict { message: String },

    #[error("Storage error: {message}")]
    Storage {
        message: String,
        #[source]
        source: Option<Box<dyn std::error::Error + Send + Sync>>,
    },

    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    #[error("Internal error: {0}")]
    Internal(#[from] anyhow::Error),

    #[error("Invalid identifier '{name}': {reason}")]
    InvalidIdentifier { name: String, reason: String },

    #[error("Label '{label}' already exists")]
    LabelAlreadyExists { label: String },

    #[error("Edge type '{edge_type}' already exists")]
    EdgeTypeAlreadyExists { edge_type: String },

    #[error("Permission denied: {action}")]
    PermissionDenied { action: String },

    #[error("Argument '{arg}' is invalid: {message}")]
    InvalidArgument { arg: String, message: String },

    /// Write context (transaction, bulk writer, or appender) is already active on session.
    #[error("A write context is already active on session '{session_id}'")]
    WriteContextAlreadyActive {
        session_id: String,
        hint: &'static str,
    },

    /// Transaction commit timed out waiting for the global writer lock.
    #[error("Transaction '{tx_id}' commit timed out")]
    CommitTimeout { tx_id: String, hint: &'static str },

    /// A `FOR UPDATE` pessimistic row lock could not be acquired within the
    /// deadline — the holder is another live transaction (contention or a
    /// lock-ordering deadlock). Unlike a plain [`UniError::Timeout`] (a slow
    /// operation that would just time out again), this is transient: a fresh
    /// transaction can retry and win the lock once the holder releases it, so
    /// it is classified retriable. See `is_retriable`.
    #[error("FOR UPDATE lock acquisition timed out after {timeout_ms}ms")]
    LockTimeout { timeout_ms: u64 },

    /// Transaction exceeded its deadline.
    #[error("Transaction '{tx_id}' expired")]
    TransactionExpired { tx_id: String, hint: &'static str },

    /// Operation was cancelled via a cancellation token.
    #[error("Operation cancelled")]
    Cancelled,

    /// Derived facts are stale relative to the current database version.
    #[error("Derived facts are stale: version gap is {version_gap}")]
    StaleDerivedFacts { version_gap: u64 },

    /// A Locy rule conflict was detected during transaction commit rule promotion.
    #[error("Rule conflict: rule '{rule_name}' conflicts during promotion")]
    RuleConflict { rule_name: String },

    /// A session hook rejected the operation.
    #[error("Hook rejected: {message}")]
    HookRejected { message: String },

    /// A synchronous trigger returned `TriggerOutcome::Reject` (or `Err`)
    /// during a `BeforeMutation` / `BeforeCommit` phase, aborting commit.
    #[error("Trigger '{trigger}' rejected commit: {reason}")]
    TriggerRejected { trigger: String, reason: String },

    /// Authentication failed (M5i). Raised when
    /// `Uni::session_with_credentials` cannot find a matching
    /// `AuthProvider` or the matched provider rejects the credentials.
    #[error("Authentication failed: {reason}")]
    AuthenticationFailed {
        /// Human-readable failure reason.
        reason: String,
    },

    /// An `AuthzPolicy::check` returned `Decision::Deny` for the
    /// current principal (M5i).
    #[error("Authorization denied: {reason}")]
    AuthorizationDenied {
        /// Reason from the deciding policy.
        reason: String,
    },

    /// A write was attempted against an ephemeral (transient, in-query)
    /// node or edge — i.e. one whose `Vid` / `Eid` has the
    /// `EPHEMERAL_BIT` set. Ephemeral entities are return-only
    /// projections; SET / DELETE / MERGE against them must fail before
    /// they reach storage (M5g / proposal §4.13.1).
    #[error("Cannot mutate ephemeral {kind} {id}: ephemeral entities are return-only")]
    EphemeralWriteAttempt {
        /// `"node"` or `"edge"`.
        kind: &'static str,
        /// Transient id (bottom 63 bits) for diagnostic output.
        id: u64,
    },

    /// Fork with the given name does not exist in the registry.
    #[error("Fork '{name}' not found")]
    ForkNotFound { name: String },

    /// `session.fork(name).new_()` was called against an existing fork.
    #[error("Fork '{name}' already exists")]
    ForkAlreadyExists { name: String },

    /// Phase-1 gate: writes through `forked_session.tx()` are blocked
    /// until Phase 2 lands. Reads, `locy()`, and admin paths work.
    #[error(
        "Writes on a forked session are not yet supported (Phase 2); reads, locy, and admin paths work"
    )]
    ForkWritesNotYetSupported,

    /// Drop refused because forked sessions are still alive on the fork.
    #[error("Fork '{name}' is held by {holder_count} live session(s); drop refused")]
    ForkInUse { name: String, holder_count: usize },

    /// Drop refused because a transaction has uncommitted mutations on the
    /// fork. Commit or roll back the transaction first, then retry drop.
    #[error("Fork '{name}' has uncommitted transaction state; commit or rollback first")]
    ForkInflightTx { name: String },

    /// Drop refused because the fork has pending async flushes that did
    /// not drain within `UniConfig::drop_fork_drain_timeout`. Either retry
    /// later (the streams will eventually complete) or raise the timeout.
    #[error("Fork '{name}' has pending flushes that did not drain within timeout")]
    PendingFlushTimeout { name: String },

    /// Registry on disk is malformed (corrupt JSON, missing required field, etc.).
    #[error("Fork registry is corrupt: {message}")]
    ForkCorruptRegistry { message: String },

    /// Drop refused because this fork has nested children. Use
    /// `drop_fork_cascade` to remove the whole subtree, or drop the
    /// children individually first.
    #[error(
        "Fork '{name}' has nested children {children:?}; use drop_fork_cascade or drop them first"
    )]
    ForkHasChildren { name: String, children: Vec<String> },

    /// `drop_fork_cascade` refused because at least one fork in the
    /// subtree has live sessions or in-flight transactions. No branch
    /// has been deleted yet — the cascade is atomic at the validation
    /// step. Resolve the blockers and retry.
    #[error("Fork subtree cannot be dropped: {blockers:?}")]
    ForkSubtreeInUse { blockers: Vec<String> },

    /// `Session::fork(name)` refused because the configured `max_forks`
    /// budget is at capacity. Drop existing forks (or wait for the
    /// sweeper to reap expired ones) and retry. Counts include Active,
    /// Pending, and Tombstoned entries.
    #[error("Fork budget exceeded: {current}/{max} forks; drop one or raise UniConfig::max_forks")]
    ForkBudgetExceeded { current: usize, max: usize },

    /// 2PC step on a fork lifecycle operation failed.
    ///
    /// `stage` names the step (`registry_pending`, `create_branch`,
    /// `registry_active`, `tombstone`, `delete_branch`, `registry_clear`,
    /// `backend_unsupported`, `recovery`) so recovery and humans can
    /// triage without parsing prose.
    #[error("Fork '{name}' lifecycle failed at stage '{stage}': {source}")]
    ForkLifecycle {
        name: String,
        stage: &'static str,
        #[source]
        source: Box<dyn std::error::Error + Send + Sync>,
    },
}

impl UniError {
    /// Returns `true` when retrying the failed operation from scratch may succeed.
    ///
    /// Distinguishes transient contention failures — optimistic-concurrency
    /// aborts and lock/commit timeouts, which a fresh transaction can win — from
    /// deterministic failures (bad query, schema or type violation) that would
    /// fail identically on retry. This is the signal
    /// [`Session::transact_with_retry`](../../../uni_db/api/session/struct.Session.html)
    /// uses to decide whether to re-run a transaction closure.
    ///
    /// `TransactionExpired` is deliberately *not* retriable here: a fresh
    /// transaction gets a new deadline, but the helper treats deadline expiry as
    /// a caller-set budget, not a contention signal. A plain `Timeout` is
    /// likewise *not* retriable — re-running the same slow operation would just
    /// time out again; only `CommitTimeout` (lock contention at the commit point)
    /// and `LockTimeout` (a contended `FOR UPDATE` row lock / deadlock) signal
    /// retriable contention.
    ///
    /// # Examples
    /// ```
    /// use uni_common::UniError;
    ///
    /// assert!(UniError::SerializationConflict { message: "lost update".into() }.is_retriable());
    /// assert!(!UniError::Schema { message: "no such label".into() }.is_retriable());
    /// ```
    #[must_use]
    pub fn is_retriable(&self) -> bool {
        matches!(
            self,
            UniError::SerializationConflict { .. }
                | UniError::ConstraintConflict { .. }
                | UniError::TransactionConflict { .. }
                | UniError::CommitTimeout { .. }
                | UniError::LockTimeout { .. }
        )
    }
}

pub type Result<T> = std::result::Result<T, UniError>;

/// Why a Locy evaluation stopped before reaching its least fixed point.
///
/// A wall-clock timeout and a non-convergence failure are both *incomplete*
/// outcomes, but they call for different remedies (raise the timeout / fix a
/// slow rule vs. raise `max_iterations` / fix a non-monotone rule), so they are
/// reported distinctly rather than collapsed into one flag.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LocyIncompleteReason {
    /// The wall-clock `timeout` budget was exhausted mid-evaluation.
    Timeout,
    /// A recursive stratum hit `max_iterations` without converging.
    IterationLimit,
}

impl LocyIncompleteReason {
    /// Returns a stable machine-readable tag (`"timeout"` / `"iteration_limit"`).
    ///
    /// Used as the discriminator surfaced to non-Rust callers (e.g. the Python
    /// bindings), where matching on a Rust enum is not available.
    #[must_use]
    pub fn as_str(self) -> &'static str {
        match self {
            LocyIncompleteReason::Timeout => "timeout",
            LocyIncompleteReason::IterationLimit => "iteration_limit",
        }
    }
}

impl std::fmt::Display for LocyIncompleteReason {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(self.as_str())
    }
}

/// Diagnostics describing a Locy evaluation that stopped before completing.
///
/// Returned (boxed) inside [`UniError::LocyIncomplete`] when a program exceeds
/// its time or iteration budget, and also attached to a `LocyResult` when the
/// caller opts into partial results. The rule lists exist so a caller can tell
/// "not evaluated" apart from "genuinely empty": any rule named in
/// `incomplete_rules` or `skipped_rules` may be missing facts purely because
/// evaluation was cut short, so a zero-row count for it is not authoritative.
///
/// # Examples
/// ```
/// use uni_common::{LocyIncomplete, LocyIncompleteReason};
///
/// let detail = LocyIncomplete {
///     reason: LocyIncompleteReason::Timeout,
///     elapsed_ms: 305_000,
///     limit_ms: 300_000,
///     max_iterations: 1000,
///     completed_strata: 2,
///     total_strata: 4,
///     incomplete_rules: vec!["upstream_reaches".into()],
///     skipped_rules: vec!["healthy_assets".into()],
///     complement_rules_affected: vec!["healthy_assets".into()],
/// };
/// assert!(detail.to_string().contains("timeout"));
/// assert!(detail.to_string().contains("UNSOUND"));
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LocyIncomplete {
    /// Why evaluation stopped.
    pub reason: LocyIncompleteReason,
    /// Wall-clock time elapsed when evaluation was cut short, in milliseconds.
    pub elapsed_ms: u64,
    /// The configured wall-clock `timeout`, in milliseconds.
    pub limit_ms: u64,
    /// The configured `max_iterations` cap for recursive strata.
    pub max_iterations: usize,
    /// Number of strata fully evaluated before the cutoff.
    pub completed_strata: usize,
    /// Total number of strata in the program.
    pub total_strata: usize,
    /// Rules in the stratum that was interrupted mid-evaluation. Their facts may
    /// be a partial fixpoint rather than the least fixed point.
    pub incomplete_rules: Vec<String>,
    /// Rules in strata that were never reached. They derived no facts solely
    /// because evaluation stopped first, not because their result is empty.
    pub skipped_rules: Vec<String>,
    /// Subset of the incomplete/skipped rules that use an `IS NOT` complement.
    /// Stratified negation over a partial relation is unsound, so these results
    /// must not be trusted at all — surfaced separately for emphasis.
    pub complement_rules_affected: Vec<String>,
}

impl std::fmt::Display for LocyIncomplete {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{reason} after {elapsed_ms}ms (limit {limit_ms}ms, max_iterations {max_iters}); \
             evaluated {done}/{total} strata, {n_incomplete} rule(s) incomplete, \
             {n_skipped} rule(s) skipped",
            reason = self.reason,
            elapsed_ms = self.elapsed_ms,
            limit_ms = self.limit_ms,
            max_iters = self.max_iterations,
            done = self.completed_strata,
            total = self.total_strata,
            n_incomplete = self.incomplete_rules.len(),
            n_skipped = self.skipped_rules.len(),
        )?;
        if !self.complement_rules_affected.is_empty() {
            write!(
                f,
                "; UNSOUND complement rule(s) affected: {:?}",
                self.complement_rules_affected
            )?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn retriable_errors_are_contention_failures() {
        let s = String::new;
        let retriable = [
            UniError::SerializationConflict { message: s() },
            UniError::ConstraintConflict { message: s() },
            UniError::TransactionConflict { message: s() },
            UniError::CommitTimeout {
                tx_id: s(),
                hint: "",
            },
            // A contended FOR UPDATE row lock / deadlock clears when the holder
            // releases; a fresh transaction can retry and win it.
            UniError::LockTimeout { timeout_ms: 10_000 },
        ];
        for e in &retriable {
            assert!(e.is_retriable(), "{e:?} should be retriable");
        }
    }

    #[test]
    fn deterministic_errors_are_not_retriable() {
        let s = String::new;
        let terminal = [
            UniError::Parse {
                message: s(),
                position: None,
                line: None,
                column: None,
                context: None,
            },
            UniError::Query {
                message: s(),
                query: None,
            },
            UniError::Schema { message: s() },
            UniError::Constraint { message: s() },
            UniError::InvalidArgument {
                arg: s(),
                message: s(),
            },
            // A caller-set deadline is not a contention signal.
            UniError::TransactionExpired {
                tx_id: s(),
                hint: "",
            },
            // Re-running the same slow operation would just time out again.
            UniError::Timeout { timeout_ms: 1 },
        ];
        for e in &terminal {
            assert!(!e.is_retriable(), "{e:?} should not be retriable");
        }
    }
}