yantrikdb-server 0.8.17

YantrikDB database server — multi-tenant cognitive memory with wire protocol, HTTP gateway, replication, auto-failover, and at-rest encryption
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//! `MutationCommitter` trait — the abstraction that lets us swap
//! `LocalSqliteCommitter` ↔ `RaftCommitter` without changing API code.
//!
//! Every API write (`/v1/remember`, `/v1/forget`, `/v1/relate`, ...) calls
//! `committer.commit(...)` instead of mutating storage directly. The
//! committer is responsible for: assigning a `(term, log_index)`,
//! persisting the mutation durably, applying it to the state machine,
//! and returning a receipt the caller can use to verify durability.

use std::time::SystemTime;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use super::mutation::{MemoryMutation, OpId, TenantId};
use crate::version::VersionError;

/// Receipt returned from a successful commit. Carries enough info that
/// the caller can verify durability and look up the mutation later in
/// the commit log.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommitReceipt {
    pub op_id: OpId,
    pub tenant_id: TenantId,
    /// Raft-style term. Always 0 for `LocalSqliteCommitter` since there's
    /// no leadership; populated by `RaftCommitter` (RFC 010 PR-4).
    pub term: u64,
    /// Monotonic per-tenant. Increments by 1 within a tenant.
    pub log_index: u64,
    pub committed_at: SystemTime,
    /// `Some` when the mutation has been applied to the state machine;
    /// `None` if `wait_for_apply: false` was passed and the apply is
    /// still in flight.
    pub applied_at: Option<SystemTime>,
}

/// Hook invoked by a committer after every successful commit, so
/// in-process consumers (e.g. [`crate::forget::TombstoneIndex`]) can
/// stay in lock-step with the durable log without the commit layer
/// taking a hard dependency on each consumer's concrete type.
///
/// Implementations MUST be cheap and non-blocking — this runs on the
/// commit hot path. Heavier work (cache invalidation broadcasts, async
/// notifications) belongs on the
/// [`crate::cache::InvalidationBus`] which fans out off-path.
pub trait CommitObserver: Send + Sync {
    /// Called immediately after a commit's SQL transaction commits and
    /// the receipt has been built. Side effects observed here MUST NOT
    /// fail in a way that affects the receipt — the commit has already
    /// happened durably.
    fn after_commit(&self, tenant_id: TenantId, mutation: &MemoryMutation);
}

/// Per-call options for `commit`.
///
/// **Default invariant** (issue #37): `Default::default()` MUST equal
/// `Self::new()`. Earlier versions derived `Default`, which gave
/// `wait_for_apply: false` (the `bool` zero value) — contradicting the
/// documented default of `true` on the field below. The production
/// handlers in `http_gateway.rs` all called `CommitOptions::default()`,
/// so single-node deployments durably appended to the commit log but
/// skipped the applier dispatch, leaving the engine state empty. Symptoms
/// were exactly as reported in yantrikos/yantrikdb#37: `/v1/remember`
/// returned `status: recorded` with a valid `log_index`, but the row
/// never appeared in `/v1/recall` and the memory count never grew.
///
/// The fix is the explicit `impl Default` below — `Self::new()` is the
/// authoritative constructor and the documented contract; `derive(Default)`
/// was the silent contradiction.
#[derive(Debug, Clone)]
pub struct CommitOptions {
    /// If `Some(n)`, fail the commit unless the next assigned `log_index`
    /// would be exactly `n`. Used for compare-and-swap semantics by the
    /// few callers that need it (e.g. cluster bootstrap). Default: `None`
    /// (no constraint).
    pub expected_log_index: Option<u64>,

    /// If `true`, wait for the mutation to be applied to the state
    /// machine before returning. If `false`, return as soon as the entry
    /// is durably appended to the log; apply happens asynchronously.
    /// Default: `true` for correctness; callers like the bulk-import
    /// path can opt out for throughput.
    pub wait_for_apply: bool,

    /// Client-provided op_id for idempotent retries. When `Some`, the
    /// committer will:
    /// - Look up `(tenant_id, op_id)` in the existing log first
    /// - If found AND mutation matches → return the existing receipt
    ///   (idempotent retry, the contract that lets HTTP clients retry
    ///   on network failures without duplicating writes)
    /// - If found AND mutation differs → return `OpIdCollision` (client bug)
    /// - If not found → assign a new log_index, persist with this op_id
    ///
    /// When `None`, the committer auto-generates a fresh UUIDv7. Use
    /// `None` for server-internal commits where retry-deduplication
    /// isn't needed.
    pub op_id: Option<super::mutation::OpId>,
}

impl Default for CommitOptions {
    /// Matches `Self::new()` per the issue #37 fix. **Do not** revert to
    /// `derive(Default)` — `bool::default()` is `false`, which produces
    /// a silent doc-vs-code contradiction with `wait_for_apply`'s
    /// documented `true` default and silently breaks every
    /// production caller of `CommitOptions::default()` (regression test
    /// in `http_integration.rs` locks this for the production handlers).
    fn default() -> Self {
        Self::new()
    }
}

impl CommitOptions {
    pub fn new() -> Self {
        Self {
            expected_log_index: None,
            wait_for_apply: true,
            op_id: None,
        }
    }

    pub fn no_wait(mut self) -> Self {
        self.wait_for_apply = false;
        self
    }

    pub fn expecting_index(mut self, idx: u64) -> Self {
        self.expected_log_index = Some(idx);
        self
    }

    pub fn with_op_id(mut self, op_id: super::mutation::OpId) -> Self {
        self.op_id = Some(op_id);
        self
    }
}

/// A single committed entry in the per-tenant log. Returned by
/// [`MutationCommitter::read_range`] for replay, audit, debug-history,
/// and Jepsen fault-injection paths.
///
/// Only `PartialEq` (not `Eq`) because `MemoryMutation` carries `f64`
/// fields (importance, weight, etc.) which are `PartialEq` but not `Eq`
/// (NaN inequality). Tests that compare entries use `assert_eq!` which
/// only requires `PartialEq`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CommittedEntry {
    pub op_id: OpId,
    pub tenant_id: TenantId,
    pub term: u64,
    pub log_index: u64,
    pub mutation: MemoryMutation,
    pub committed_at: SystemTime,
    pub applied_at: Option<SystemTime>,
}

#[derive(Debug, Clone, PartialEq, Error)]
pub enum CommitError {
    /// The same `op_id` was committed previously with a different
    /// mutation payload. Callers MUST NOT change the mutation while
    /// keeping the same op_id — that's a client bug.
    #[error(
        "op_id collision for tenant {tenant_id}: existing log_index {existing_index} \
         has different mutation payload than the one being committed. Client bug: \
         the same op_id must always carry the same mutation."
    )]
    OpIdCollision {
        op_id: OpId,
        tenant_id: TenantId,
        existing_index: u64,
    },

    /// `expected_log_index` mismatch (compare-and-swap failure).
    #[error(
        "expected log_index {expected} for tenant {tenant_id}, but next index is {actual}. \
         Concurrent write happened — caller should re-read state and retry."
    )]
    UnexpectedLogIndex {
        tenant_id: TenantId,
        expected: u64,
        actual: u64,
    },

    /// Mutation rejected by version policy (e.g. wire major mismatch
    /// during a rolling upgrade).
    #[error("version check failed: {0}")]
    Version(#[from] VersionError),

    /// Mutation variant exists in the grammar but isn't implemented yet.
    /// `PurgeMemory` returns this until RFC 011 ships.
    #[error("mutation variant `{variant}` not yet implemented (planned in RFC {planned_rfc})")]
    NotYetImplemented {
        variant: &'static str,
        planned_rfc: &'static str,
    },

    /// Underlying storage / IO failure.
    #[error("storage failure: {message}")]
    StorageFailure { message: String },

    /// Server is shutting down — caller should NOT retry on this node.
    #[error("server shutting down")]
    Shutdown,

    /// This node is not the cluster leader. Returned by `RaftCommitter`
    /// when openraft's `client_write` reports `ForwardToLeader`. The
    /// caller (HTTP gateway, CLI client) SHOULD redirect to
    /// `leader_addr` if known, or surface a 503 with `Retry-After` if
    /// the cluster is mid-election. `leader_id` / `leader_addr` are
    /// `None` when no leader is currently known.
    #[error("not the cluster leader; redirect to leader id={leader_id:?} addr={leader_addr:?}")]
    NotLeader {
        leader_id: Option<u64>,
        leader_addr: Option<String>,
    },

    /// Commit + apply round-trip exceeded the configured timeout. The
    /// op_id is preserved so the client can retry idempotently against
    /// the leader — if the entry actually committed (timeout was
    /// spurious), the retry returns the original receipt; if it didn't,
    /// the retry creates it.
    ///
    /// Maps to HTTP 503 with the op_id in the body. Critical for
    /// network-partition recovery: blind retries without op_id-based
    /// idempotency would create duplicate writes.
    ///
    /// Defined in PR-6.6 alongside the HTTP error mapping. Not produced
    /// by any committer today; PR-6.4's RaftSubmitter will surface it
    /// when openraft's client_write round-trip times out.
    #[error("commit + apply timed out for op_id {op_id}; retry idempotently against leader")]
    CommitTimeout { op_id: OpId },
}

impl CommitError {
    /// Stable label for metrics. No user data — safe for Prometheus.
    pub fn metric_label(&self) -> &'static str {
        match self {
            CommitError::OpIdCollision { .. } => "op_id_collision",
            CommitError::UnexpectedLogIndex { .. } => "unexpected_log_index",
            CommitError::Version(_) => "version",
            CommitError::NotYetImplemented { .. } => "not_yet_implemented",
            CommitError::StorageFailure { .. } => "storage_failure",
            CommitError::Shutdown => "shutdown",
            CommitError::NotLeader { .. } => "not_leader",
            CommitError::CommitTimeout { .. } => "commit_timeout",
        }
    }

    /// Whether the caller should retry. `OpIdCollision` is a client bug
    /// (don't retry); `UnexpectedLogIndex` and `StorageFailure` are
    /// transient (retry after backoff); `Shutdown` and
    /// `NotYetImplemented` and `Version` are terminal.
    /// `NotLeader` is retryable AGAINST THE LEADER (not the same node).
    /// `CommitTimeout` is retryable BUT clients MUST reuse the op_id to
    /// stay idempotent — blind retries duplicate writes.
    pub fn is_retryable(&self) -> bool {
        matches!(
            self,
            CommitError::UnexpectedLogIndex { .. }
                | CommitError::StorageFailure { .. }
                | CommitError::NotLeader { .. }
                | CommitError::CommitTimeout { .. }
        )
    }
}

/// Trait every commit backend implements. `LocalSqliteCommitter` (RFC 010
/// PR-1+2) and `RaftCommitter` (RFC 010 PR-4) both satisfy this interface;
/// API handlers hold an `Arc<dyn MutationCommitter>` and don't care which
/// is in use.
#[async_trait]
pub trait MutationCommitter: Send + Sync {
    /// Commit a mutation. Idempotent on `op_id`: re-committing the same
    /// (op_id, mutation) returns the original receipt.
    async fn commit(
        &self,
        tenant_id: TenantId,
        mutation: MemoryMutation,
        opts: CommitOptions,
    ) -> Result<CommitReceipt, CommitError>;

    /// Read a range of committed entries for a tenant, starting at
    /// `from_index` (inclusive), up to `limit` entries. Used for replay,
    /// audit, debug-history, and Jepsen fault-injection paths.
    async fn read_range(
        &self,
        tenant_id: TenantId,
        from_index: u64,
        limit: usize,
    ) -> Result<Vec<CommittedEntry>, CommitError>;

    /// High watermark — max `log_index` ever assigned for this tenant.
    /// Returns 0 if the tenant has no entries yet. Used by HNSW
    /// reconciliation (RFC 013) to know how much it needs to catch up.
    async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError>;

    /// List every tenant id that has at least one entry in the commit
    /// log. Returns ids in ascending order. Used by the Raft state
    /// machine's snapshot builder (RFC 010 PR-4-c) to enumerate
    /// tenants without probing — replaces the prior O(MAX_TENANTS)
    /// linear scan.
    ///
    /// Implementations SHOULD be efficient: production callers invoke
    /// this on every snapshot build. The default `LocalSqliteCommitter`
    /// uses `SELECT DISTINCT tenant_id`.
    async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError>;

    /// Establish a linearizability barrier for subsequent reads from
    /// the local state machine.
    ///
    /// Contract:
    /// - On `Ok(())`, the caller MAY proceed to read from this node's
    ///   state machine and the result is linearizable across the
    ///   cluster as of the moment this call returned.
    /// - On `CommitError::NotLeader`, this node is not the leader; the
    ///   caller SHOULD redirect to the leader.
    /// - On any other error, the caller SHOULD treat reads from this
    ///   node as stale and retry / surface the error.
    ///
    /// The default implementation returns `Ok(())` — correct for
    /// single-node committers where every read is trivially
    /// linearizable. `RaftCommitter` overrides this to call
    /// `Raft::ensure_linearizable()` which heartbeats a quorum to
    /// confirm leadership and waits for the state machine to apply up
    /// to the read-barrier index.
    async fn ensure_linearizable(&self) -> Result<(), CommitError> {
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn commit_options_builder_pattern() {
        let opts = CommitOptions::new().no_wait().expecting_index(42);
        assert!(!opts.wait_for_apply);
        assert_eq!(opts.expected_log_index, Some(42));
    }

    #[test]
    fn commit_options_default_is_safe() {
        // **Default MUST wait_for_apply=true.** Anything else is a footgun
        // that causes "I committed but recall doesn't see it" reports.
        //
        // **Issue #37 retrospective**: this test pre-existed the regression
        // but only asserted on `CommitOptions::new()`. The shipped bug was
        // that `#[derive(Default)]` produced `wait_for_apply: false` (the
        // bool zero), and every production handler in `http_gateway.rs`
        // called `CommitOptions::default()` — not `::new()`. The test name
        // ("commit_options_default_is_safe") promised the invariant; the
        // test body did not enforce it on the actual `Default` impl. Both
        // constructors are now asserted to satisfy the invariant, so a
        // future revert of the explicit `impl Default` below trips this.
        let from_new = CommitOptions::new();
        assert!(
            from_new.wait_for_apply,
            "Self::new() must default wait_for_apply to true"
        );
        assert_eq!(from_new.expected_log_index, None);
        assert!(from_new.op_id.is_none());

        let from_default = CommitOptions::default();
        assert!(
            from_default.wait_for_apply,
            "Default::default() must equal Self::new() — see issue #37 (do NOT revert to #[derive(Default)])"
        );
        assert_eq!(from_default.expected_log_index, None);
        assert!(from_default.op_id.is_none());
    }

    #[test]
    fn metric_labels_are_stable() {
        // Dashboards key on these strings.
        assert_eq!(
            CommitError::OpIdCollision {
                op_id: OpId::new_random(),
                tenant_id: TenantId::new(1),
                existing_index: 0,
            }
            .metric_label(),
            "op_id_collision"
        );
        assert_eq!(
            CommitError::UnexpectedLogIndex {
                tenant_id: TenantId::new(1),
                expected: 1,
                actual: 2,
            }
            .metric_label(),
            "unexpected_log_index"
        );
        assert_eq!(
            CommitError::NotYetImplemented {
                variant: "PurgeMemory",
                planned_rfc: "011",
            }
            .metric_label(),
            "not_yet_implemented"
        );
        assert_eq!(
            CommitError::StorageFailure {
                message: "disk full".into(),
            }
            .metric_label(),
            "storage_failure"
        );
        assert_eq!(CommitError::Shutdown.metric_label(), "shutdown");
    }

    #[test]
    fn retryable_classification_is_correct() {
        // Retry on transient errors only. OpIdCollision is a CLIENT BUG —
        // retrying makes it worse. Shutdown means stop, don't retry.
        assert!(!CommitError::OpIdCollision {
            op_id: OpId::new_random(),
            tenant_id: TenantId::new(1),
            existing_index: 0,
        }
        .is_retryable());
        assert!(CommitError::UnexpectedLogIndex {
            tenant_id: TenantId::new(1),
            expected: 1,
            actual: 2,
        }
        .is_retryable());
        assert!(CommitError::StorageFailure {
            message: "transient".into(),
        }
        .is_retryable());
        assert!(!CommitError::Shutdown.is_retryable());
        assert!(!CommitError::NotYetImplemented {
            variant: "x",
            planned_rfc: "y",
        }
        .is_retryable());
    }

    #[test]
    fn commit_receipt_serde_round_trip() {
        let r = CommitReceipt {
            op_id: OpId::new_random(),
            tenant_id: TenantId::new(7),
            term: 1,
            log_index: 42,
            committed_at: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
            applied_at: Some(
                SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_001),
            ),
        };
        let json = serde_json::to_string(&r).unwrap();
        let back: CommitReceipt = serde_json::from_str(&json).unwrap();
        assert_eq!(r, back);
    }
}