yantrikdb-server 0.8.11

YantrikDB database server — multi-tenant cognitive memory with wire protocol, HTTP gateway, replication, auto-failover, and at-rest encryption
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
//! `RaftCommitter` — implements [`crate::commit::MutationCommitter`] by
//! routing every write through openraft's `client_write` so the
//! application gateway code (`/v1/remember`, `/v1/forget`, etc.)
//! doesn't have to know whether it's running single-node or in a Raft
//! cluster.
//!
//! ## Design
//!
//! `RaftCommitter` holds:
//! - `raft: Arc<openraft::Raft<YantrikRaftTypeConfig>>` — the assembled
//!   Raft instance. Writes go through `raft.client_write(YantrikLogEntry)`.
//! - `local: Arc<dyn MutationCommitter>` — the local committer that
//!   backs the state-machine apply path. Reads (`read_range`,
//!   `high_watermark`) delegate here because the state machine has
//!   already committed applied entries into this exact local log.
//!
//! ## Why reads go to the local committer instead of through Raft
//!
//! In Raft, follower reads are not linearizable unless gated through
//! the leader (or via lease/read-index protocols). For Phase 2 we
//! intentionally serve reads from the local applied state — that's
//! "stale reads OK" semantics, which matches what the existing
//! `LocalSqliteCommitter` callers were already getting in single-node
//! mode. PR-4-d-b can layer linearizable reads via openraft's
//! `Raft::ensure_linearizable()` once the network exists.
//!
//! ## Sub-PR layout for openraft adoption
//!
//! - **PR-4-a**: types
//! - **PR-4-b**: `SqliteRaftLogStorage`
//! - **PR-4-c**: `YantrikStateMachine` + snapshot
//! - **PR-4-d-a (this)**: `RaftCommitter` + `StubRaftNetwork` (single-node only)
//! - **PR-4-d-b**: `HttpRaftNetwork` + axum receive routes (real cluster transport)
//! - **PR-4-e**: 3-node integration test, leader-kill failover, partition heal

use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use openraft::error::{CheckIsLeaderError, ClientWriteError, ForwardToLeader, RaftError};
use openraft::Raft;

use super::types::{YantrikLogEntry, YantrikNode, YantrikNodeId, YantrikRaftTypeConfig};
use crate::commit::{
    CommitError, CommitOptions, CommitReceipt, CommittedEntry, MemoryMutation, MutationCommitter,
    OpId, TenantId,
};

/// Committer that replicates writes through openraft. Cheap to clone —
/// the Raft instance and the local committer are both `Arc`-shared.
#[derive(Clone)]
pub struct RaftCommitter {
    raft: Arc<Raft<YantrikRaftTypeConfig>>,
    /// Local committer that backs the state machine. Reads delegate
    /// here so they reflect the state machine's applied view.
    local: Arc<dyn MutationCommitter>,
}

impl RaftCommitter {
    pub fn new(raft: Arc<Raft<YantrikRaftTypeConfig>>, local: Arc<dyn MutationCommitter>) -> Self {
        Self { raft, local }
    }

    fn map_raft_error(
        err: RaftError<YantrikNodeId, ClientWriteError<YantrikNodeId, YantrikNode>>,
    ) -> CommitError {
        // ForwardToLeader is the most common Raft error in cluster mode
        // (a client lands on a follower). Surface it as `NotLeader` so
        // the HTTP gateway / CLI can redirect rather than treating it
        // as a storage failure.
        match err {
            RaftError::APIError(ClientWriteError::ForwardToLeader(ForwardToLeader {
                leader_id,
                leader_node,
            })) => CommitError::NotLeader {
                leader_id: leader_id.map(|id| id.raw()),
                leader_addr: leader_node.map(|n| n.addr),
            },
            other => CommitError::StorageFailure {
                message: format!("raft client_write: {other}"),
            },
        }
    }

    fn map_check_is_leader_error(
        err: RaftError<YantrikNodeId, CheckIsLeaderError<YantrikNodeId, YantrikNode>>,
    ) -> CommitError {
        match err {
            RaftError::APIError(CheckIsLeaderError::ForwardToLeader(ForwardToLeader {
                leader_id,
                leader_node,
            })) => CommitError::NotLeader {
                leader_id: leader_id.map(|id| id.raw()),
                leader_addr: leader_node.map(|n| n.addr),
            },
            other => CommitError::StorageFailure {
                message: format!("raft ensure_linearizable: {other}"),
            },
        }
    }
}

#[async_trait]
impl MutationCommitter for RaftCommitter {
    async fn commit(
        &self,
        tenant_id: TenantId,
        mutation: MemoryMutation,
        opts: CommitOptions,
    ) -> Result<CommitReceipt, CommitError> {
        // Reject grammar-only variants up-front (same gate as
        // LocalSqliteCommitter — replicating an unimplemented variant
        // would just fail in apply on every follower).
        if !mutation.is_implemented() {
            return Err(CommitError::NotYetImplemented {
                variant: mutation.variant_name(),
                planned_rfc: mutation.planned_rfc(),
            });
        }

        let op_id = opts.op_id.unwrap_or_else(OpId::new_random);
        let entry = YantrikLogEntry::new(tenant_id, op_id, mutation);

        let response = self
            .raft
            .client_write(entry)
            .await
            .map_err(Self::map_raft_error)?;

        // ClientWriteResponse carries the log_id (Raft term + index)
        // and the application-defined data (YantrikRaftResponse).
        let raft_log_id = response.log_id;
        let app = response.data;
        let applied_at = SystemTime::UNIX_EPOCH
            .checked_add(std::time::Duration::from_micros(
                app.applied_at_unix_micros.max(0) as u64,
            ))
            .unwrap_or(SystemTime::UNIX_EPOCH);

        Ok(CommitReceipt {
            op_id,
            tenant_id,
            term: raft_log_id.leader_id.term,
            log_index: app.tenant_log_index,
            committed_at: applied_at,
            applied_at: Some(applied_at),
        })
    }

    async fn read_range(
        &self,
        tenant_id: TenantId,
        from_index: u64,
        limit: usize,
    ) -> Result<Vec<CommittedEntry>, CommitError> {
        // Stale-read OK: the local committer has every entry the state
        // machine has applied on this node.
        self.local.read_range(tenant_id, from_index, limit).await
    }

    async fn high_watermark(&self, tenant_id: TenantId) -> Result<u64, CommitError> {
        self.local.high_watermark(tenant_id).await
    }

    async fn list_active_tenants(&self) -> Result<Vec<TenantId>, CommitError> {
        self.local.list_active_tenants().await
    }

    async fn ensure_linearizable(&self) -> Result<(), CommitError> {
        // openraft heartbeats a quorum to confirm leadership and waits
        // for the state machine to apply up to the read-barrier index.
        // After this returns Ok, a subsequent read from the local
        // committer is linearizable across the cluster.
        self.raft
            .ensure_linearizable()
            .await
            .map_err(Self::map_check_is_leader_error)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::commit::LocalSqliteCommitter;
    use crate::raft::log_storage::SqliteRaftLogStorage;
    use crate::raft::network::StubRaftNetworkFactory;
    use crate::raft::state_machine::YantrikStateMachine;
    use crate::raft::types::YantrikNode;
    use openraft::Config;
    use std::collections::BTreeMap;

    /// Build a single-node Raft cluster with everything wired up.
    /// Returns the assembled committer + the inner local committer
    /// (so tests can verify the apply path landed entries durably).
    async fn build_single_node_committer() -> (RaftCommitter, Arc<LocalSqliteCommitter>) {
        let local = Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
        let log_store = SqliteRaftLogStorage::open_in_memory();
        let state_machine = YantrikStateMachine::new(local.clone());
        let network = StubRaftNetworkFactory;

        let config = Arc::new(
            Config {
                cluster_name: "yantrikdb-test".into(),
                heartbeat_interval: 100,
                election_timeout_min: 200,
                election_timeout_max: 400,
                ..Default::default()
            }
            .validate()
            .unwrap(),
        );

        let me = YantrikNodeId::new(1);
        let raft = Arc::new(
            Raft::<YantrikRaftTypeConfig>::new(me, config, network, log_store, state_machine)
                .await
                .unwrap(),
        );

        // Initialize as a single-node cluster.
        let mut nodes = BTreeMap::new();
        nodes.insert(me, YantrikNode::new("http://127.0.0.1:0"));
        raft.initialize(nodes).await.unwrap();

        // Wait for leadership — single-node should win immediately.
        for _ in 0..30 {
            if raft.current_leader().await == Some(me) {
                break;
            }
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
        }
        assert_eq!(raft.current_leader().await, Some(me), "should be leader");

        (RaftCommitter::new(raft, local.clone()), local)
    }

    fn upsert(rid: &str) -> MemoryMutation {
        MemoryMutation::UpsertMemory {
            rid: rid.into(),
            text: format!("text-{rid}"),
            memory_type: "semantic".into(),
            importance: 0.5,
            valence: 0.0,
            half_life: 168.0,
            namespace: "default".into(),
            certainty: 1.0,
            domain: "general".into(),
            source: "user".into(),
            emotional_state: None,
            embedding: None,
            metadata: serde_json::json!({}),
        }
    }

    #[tokio::test]
    async fn single_node_commit_round_trips_through_raft() {
        let (cm, local) = build_single_node_committer().await;
        let receipt = cm
            .commit(TenantId::new(1), upsert("a"), CommitOptions::default())
            .await
            .unwrap();
        assert_eq!(receipt.tenant_id, TenantId::new(1));
        assert_eq!(receipt.log_index, 1);
        assert!(receipt.term >= 1, "Raft term should be >= 1");
        assert!(receipt.applied_at.is_some());

        // The local committer (state machine apply path) saw the entry.
        assert_eq!(local.high_watermark(TenantId::new(1)).await.unwrap(), 1);
    }

    #[tokio::test]
    async fn unimplemented_variant_is_rejected_before_raft() {
        let (cm, _) = build_single_node_committer().await;
        let cfg = MemoryMutation::TenantConfigPatch {
            key: "k".into(),
            value: serde_json::Value::Null,
        };
        let err = cm
            .commit(TenantId::new(1), cfg, CommitOptions::default())
            .await
            .unwrap_err();
        assert!(matches!(err, CommitError::NotYetImplemented { .. }));
    }

    #[tokio::test]
    async fn read_range_delegates_to_local_committer() {
        let (cm, _) = build_single_node_committer().await;
        let _ = cm
            .commit(TenantId::new(1), upsert("a"), CommitOptions::default())
            .await
            .unwrap();
        let _ = cm
            .commit(TenantId::new(1), upsert("b"), CommitOptions::default())
            .await
            .unwrap();
        let entries = cm.read_range(TenantId::new(1), 1, 100).await.unwrap();
        assert_eq!(entries.len(), 2);
    }

    #[tokio::test]
    async fn high_watermark_reflects_committed_entries() {
        let (cm, _) = build_single_node_committer().await;
        for i in 0..3 {
            let _ = cm
                .commit(
                    TenantId::new(1),
                    upsert(&format!("e{i}")),
                    CommitOptions::default(),
                )
                .await
                .unwrap();
        }
        assert_eq!(cm.high_watermark(TenantId::new(1)).await.unwrap(), 3);
    }

    #[tokio::test]
    async fn idempotent_op_id_returns_same_log_index() {
        let (cm, _) = build_single_node_committer().await;
        let op_id = OpId::new_random();
        let r1 = cm
            .commit(
                TenantId::new(1),
                upsert("a"),
                CommitOptions::default().with_op_id(op_id),
            )
            .await
            .unwrap();
        // The committer's apply path uses the same op_id, so the second
        // call would short-circuit on the (tenant, op_id) UNIQUE index
        // inside LocalSqliteCommitter — both receipts agree on log_index.
        let r2 = cm
            .commit(
                TenantId::new(1),
                upsert("a"),
                CommitOptions::default().with_op_id(op_id),
            )
            .await
            .unwrap();
        assert_eq!(r1.log_index, r2.log_index);
    }

    #[tokio::test]
    async fn per_tenant_log_index_is_independent() {
        let (cm, _) = build_single_node_committer().await;
        let r1 = cm
            .commit(TenantId::new(1), upsert("a"), CommitOptions::default())
            .await
            .unwrap();
        let r2 = cm
            .commit(TenantId::new(2), upsert("b"), CommitOptions::default())
            .await
            .unwrap();
        // Per-tenant log indices both start at 1 even though Raft term
        // and global log_index are shared.
        assert_eq!(r1.log_index, 1);
        assert_eq!(r2.log_index, 1);
    }

    #[tokio::test]
    async fn list_active_tenants_returns_all_used_ids() {
        let (cm, _) = build_single_node_committer().await;
        // Tenants 1, 3, 7 — verify list_active_tenants surfaces exactly these.
        for t in [1, 3, 7] {
            let _ = cm
                .commit(TenantId::new(t), upsert("a"), CommitOptions::default())
                .await
                .unwrap();
        }
        let mut tenants = cm.list_active_tenants().await.unwrap();
        tenants.sort();
        assert_eq!(
            tenants,
            vec![TenantId::new(1), TenantId::new(3), TenantId::new(7)]
        );
    }

    #[tokio::test]
    async fn forward_to_leader_translates_to_not_leader() {
        // Map an explicit ForwardToLeader RaftError to confirm the
        // translation surfaces leader_id + leader_addr — without
        // standing up a full multi-node cluster (the 3-node cluster
        // integration test exercises the live path).
        let ftl = ForwardToLeader {
            leader_id: Some(YantrikNodeId::new(7)),
            leader_node: Some(YantrikNode::new("http://10.0.0.5:7100")),
        };
        let err = RaftError::APIError(ClientWriteError::ForwardToLeader(ftl));
        let mapped = RaftCommitter::map_raft_error(err);
        match mapped {
            CommitError::NotLeader {
                leader_id,
                leader_addr,
            } => {
                assert_eq!(leader_id, Some(7));
                assert_eq!(leader_addr.as_deref(), Some("http://10.0.0.5:7100"));
            }
            other => panic!("expected NotLeader, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn forward_to_leader_with_unknown_leader_yields_none_fields() {
        // Mid-election: openraft returns ForwardToLeader with leader_id
        // = None. Translation must preserve that None so the caller
        // surfaces "no leader yet" rather than a bogus address.
        let ftl = ForwardToLeader::<YantrikNodeId, YantrikNode>::empty();
        let err = RaftError::APIError(ClientWriteError::ForwardToLeader(ftl));
        let mapped = RaftCommitter::map_raft_error(err);
        match mapped {
            CommitError::NotLeader {
                leader_id,
                leader_addr,
            } => {
                assert_eq!(leader_id, None);
                assert_eq!(leader_addr, None);
            }
            other => panic!("expected NotLeader, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn ensure_linearizable_succeeds_on_single_node_leader() {
        // Single-node Raft is always trivially linearizable — the
        // leader is the only node, so heartbeating "a quorum" is just
        // confirming itself.
        let (cm, _) = build_single_node_committer().await;
        cm.ensure_linearizable()
            .await
            .expect("single-node leader must always be linearizable");
    }

    #[tokio::test]
    async fn ensure_linearizable_local_committer_is_no_op() {
        // The default trait impl is correct for LocalSqliteCommitter:
        // single-node has no replication, so every read is trivially
        // linearizable.
        let local = LocalSqliteCommitter::open_in_memory().unwrap();
        local
            .ensure_linearizable()
            .await
            .expect("local committer trivially linearizable");
    }

    #[tokio::test]
    async fn check_is_leader_error_translates_to_not_leader() {
        // Same pattern as ForwardToLeader → NotLeader for client_write.
        let ftl = ForwardToLeader {
            leader_id: Some(YantrikNodeId::new(3)),
            leader_node: Some(YantrikNode::new("http://10.0.0.3:7100")),
        };
        let err = RaftError::APIError(CheckIsLeaderError::ForwardToLeader(ftl));
        let mapped = RaftCommitter::map_check_is_leader_error(err);
        match mapped {
            CommitError::NotLeader {
                leader_id,
                leader_addr,
            } => {
                assert_eq!(leader_id, Some(3));
                assert_eq!(leader_addr.as_deref(), Some("http://10.0.0.3:7100"));
            }
            other => panic!("expected NotLeader, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn not_leader_is_classified_retryable_against_leader() {
        // Standing acceptance: NotLeader is retryable (against a
        // different node, the leader). Distinct from OpIdCollision
        // which is a client bug and NOT retryable.
        let err = CommitError::NotLeader {
            leader_id: Some(2),
            leader_addr: Some("http://x".into()),
        };
        assert!(err.is_retryable());
        assert_eq!(err.metric_label(), "not_leader");
    }
}