yantrikdb-server 0.8.9

YantrikDB database server — multi-tenant cognitive memory with wire protocol, HTTP gateway, replication, auto-failover, and at-rest encryption
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
//! `TombstoneIndex` — in-memory index of tombstoned `(tenant_id, rid)`
//! pairs, implementing [`crate::cache::TombstoneProvider`] over the
//! `memory_commit_log` table.
//!
//! See module docs in [`super`] for the design rationale.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use async_trait::async_trait;
use parking_lot::RwLock;
use rusqlite::Connection;

use crate::cache::TombstoneProvider;
use crate::commit::{CommitError, CommitObserver, MemoryMutation, TenantId};

/// In-memory index of `(tenant_id, rid)` pairs that have been tombstoned.
///
/// Cheap to clone via `Arc`. Concurrent reads use a `parking_lot::RwLock`
/// so the cache hot path doesn't block on writers — `record` (a single
/// `HashSet::insert` per call) is short enough that contention with
/// readers is negligible.
pub struct TombstoneIndex {
    inner: RwLock<HashMap<TenantId, HashSet<String>>>,
}

impl TombstoneIndex {
    /// Empty index. Useful for tests that don't need hydration.
    pub fn new() -> Self {
        Self {
            inner: RwLock::new(HashMap::new()),
        }
    }

    /// Hydrate the index from the commit log. Scans every
    /// `TombstoneMemory` entry across all tenants and inserts the rids.
    /// `PurgeMemory` entries cancel earlier tombstones (the rid is
    /// physically gone — no further filtering needed).
    ///
    /// Linear in the number of tombstone+purge entries; called once at
    /// startup before any recall traffic is admitted.
    pub fn from_commit_log(conn: &Connection) -> Result<Arc<Self>, CommitError> {
        let mut stmt = conn
            .prepare(
                "SELECT tenant_id, op_kind, payload
                 FROM memory_commit_log
                 WHERE op_kind IN ('TombstoneMemory', 'PurgeMemory')
                 ORDER BY tenant_id ASC, log_index ASC",
            )
            .map_err(|e| CommitError::StorageFailure {
                message: format!("prepare tombstone scan failed: {e}"),
            })?;

        let mut tenants: HashMap<TenantId, HashSet<String>> = HashMap::new();

        let rows = stmt
            .query_map([], |row| {
                let tenant_id_raw: i64 = row.get(0)?;
                let op_kind: String = row.get(1)?;
                let payload: Vec<u8> = row.get(2)?;
                Ok((tenant_id_raw, op_kind, payload))
            })
            .map_err(|e| CommitError::StorageFailure {
                message: format!("tombstone scan query failed: {e}"),
            })?;

        for row_result in rows {
            let (tenant_id_raw, op_kind, payload) =
                row_result.map_err(|e| CommitError::StorageFailure {
                    message: format!("tombstone scan row failed: {e}"),
                })?;
            let tenant_id = TenantId::new(tenant_id_raw);
            let mutation: MemoryMutation =
                serde_json::from_slice(&payload).map_err(|e| CommitError::StorageFailure {
                    message: format!(
                        "tombstone payload deserialize failed for tenant {tenant_id}: {e}"
                    ),
                })?;
            match (op_kind.as_str(), mutation) {
                ("TombstoneMemory", MemoryMutation::TombstoneMemory { rid, .. }) => {
                    tenants.entry(tenant_id).or_default().insert(rid);
                }
                ("PurgeMemory", MemoryMutation::PurgeMemory { rid, .. }) => {
                    if let Some(set) = tenants.get_mut(&tenant_id) {
                        set.remove(&rid);
                    }
                }
                // op_kind column and payload kind disagreed — payload is
                // the source of truth, but we surface this as a hard error
                // because it indicates corruption or grammar drift that
                // would silently skew the index.
                (kind, payload_mut) => {
                    return Err(CommitError::StorageFailure {
                        message: format!(
                            "op_kind/payload mismatch in commit log: column={kind} payload={}",
                            payload_mut.variant_name()
                        ),
                    });
                }
            }
        }

        Ok(Arc::new(Self {
            inner: RwLock::new(tenants),
        }))
    }

    /// Record a freshly-committed tombstone. Called by the committer
    /// after a successful `TombstoneMemory` commit so subsequent reads
    /// see the tombstone immediately.
    pub fn record(&self, tenant_id: TenantId, rid: String) {
        self.inner.write().entry(tenant_id).or_default().insert(rid);
    }

    /// Drop a rid from the index after a `PurgeMemory` commit succeeds.
    /// Idempotent: removing a rid that's not present is a no-op.
    pub fn record_purge(&self, tenant_id: TenantId, rid: &str) {
        if let Some(set) = self.inner.write().get_mut(&tenant_id) {
            set.remove(rid);
        }
    }

    /// Number of tombstoned rids for a tenant. Used by `/metrics`.
    pub fn tenant_size(&self, tenant_id: TenantId) -> usize {
        self.inner
            .read()
            .get(&tenant_id)
            .map(|set| set.len())
            .unwrap_or(0)
    }

    /// Total tombstoned rids across all tenants. Used by `/metrics`.
    pub fn total_size(&self) -> usize {
        self.inner.read().values().map(|set| set.len()).sum()
    }

    /// Number of distinct tenants with at least one tombstone. Used by
    /// `/metrics`.
    pub fn tenant_count(&self) -> usize {
        self.inner.read().len()
    }
}

impl Default for TombstoneIndex {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl TombstoneProvider for TombstoneIndex {
    async fn is_tombstoned(&self, tenant_id: TenantId, rid: &str) -> bool {
        self.inner
            .read()
            .get(&tenant_id)
            .map(|set| set.contains(rid))
            .unwrap_or(false)
    }
}

impl CommitObserver for TombstoneIndex {
    /// Hot path on every commit: branch on the variant and update the
    /// in-memory set. Called by `LocalSqliteCommitter` after the SQL
    /// transaction has committed durably.
    fn after_commit(&self, tenant_id: TenantId, mutation: &MemoryMutation) {
        if let Some(rid) = mutation.tombstoned_rid() {
            self.record(tenant_id, rid.to_string());
        }
        if let Some(rid) = mutation.purged_rid() {
            self.record_purge(tenant_id, rid);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::commit::{CommitOptions, LocalSqliteCommitter, MutationCommitter, OpId};
    use crate::migrations::MigrationRunner;

    fn tombstone(rid: &str) -> MemoryMutation {
        MemoryMutation::TombstoneMemory {
            rid: rid.into(),
            reason: Some("test".into()),
            requested_at_unix_micros: 0,
        }
    }

    fn upsert(rid: &str) -> MemoryMutation {
        MemoryMutation::UpsertMemory {
            rid: rid.into(),
            text: "x".into(),
            memory_type: "semantic".into(),
            importance: 0.5,
            valence: 0.0,
            half_life: 168.0,
            namespace: "default".into(),
            certainty: 1.0,
            domain: "general".into(),
            source: "user".into(),
            emotional_state: None,
            embedding: None,
            metadata: serde_json::json!({}),
        }
    }

    #[tokio::test]
    async fn empty_index_reports_nothing_tombstoned() {
        let idx = TombstoneIndex::new();
        assert!(!idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        assert_eq!(idx.total_size(), 0);
        assert_eq!(idx.tenant_count(), 0);
    }

    #[tokio::test]
    async fn record_then_query_returns_true() {
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_a".into());
        assert!(idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        assert!(!idx.is_tombstoned(TenantId::new(1), "mem_b").await);
    }

    #[tokio::test]
    async fn tenant_scope_is_respected() {
        // A tombstone in tenant 1 is invisible from tenant 2's view —
        // critical for multi-tenant isolation.
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_a".into());
        assert!(idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        assert!(!idx.is_tombstoned(TenantId::new(2), "mem_a").await);
    }

    #[tokio::test]
    async fn record_purge_removes_from_index() {
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_a".into());
        assert!(idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        idx.record_purge(TenantId::new(1), "mem_a");
        assert!(!idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        // Idempotent — removing again is a no-op, not a panic.
        idx.record_purge(TenantId::new(1), "mem_a");
    }

    #[tokio::test]
    async fn any_tombstoned_walks_the_list() {
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_b".into());
        let rids = vec![
            "mem_a".to_string(),
            "mem_b".to_string(),
            "mem_c".to_string(),
        ];
        assert!(idx.any_tombstoned(TenantId::new(1), &rids).await);
        // None tombstoned → false.
        let clean = vec!["mem_a".to_string(), "mem_c".to_string()];
        assert!(!idx.any_tombstoned(TenantId::new(1), &clean).await);
    }

    #[tokio::test]
    async fn metrics_track_size_and_tenant_count() {
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_a".into());
        idx.record(TenantId::new(1), "mem_b".into());
        idx.record(TenantId::new(2), "mem_c".into());
        assert_eq!(idx.tenant_size(TenantId::new(1)), 2);
        assert_eq!(idx.tenant_size(TenantId::new(2)), 1);
        assert_eq!(idx.tenant_size(TenantId::new(99)), 0);
        assert_eq!(idx.total_size(), 3);
        assert_eq!(idx.tenant_count(), 2);
    }

    #[tokio::test]
    async fn from_commit_log_hydrates_existing_tombstones() {
        // End-to-end: write a memory through the committer, tombstone it,
        // tear down the committer, rebuild the index from the persistent
        // log, and verify the tombstone is still seen.
        let tmp = tempfile::TempDir::new().unwrap();
        let db_path = tmp.path().join("commit.db");

        let committer = LocalSqliteCommitter::open(&db_path).unwrap();
        let tenant = TenantId::new(1);
        committer
            .commit(tenant, upsert("mem_a"), CommitOptions::default())
            .await
            .unwrap();
        committer
            .commit(tenant, tombstone("mem_a"), CommitOptions::default())
            .await
            .unwrap();
        committer
            .commit(tenant, tombstone("mem_b"), CommitOptions::default())
            .await
            .unwrap();
        // Drop the committer so the WAL is checkpointed.
        drop(committer);

        // Rebuild only the index, from a fresh connection.
        let mut conn = rusqlite::Connection::open(&db_path).unwrap();
        MigrationRunner::run_pending(&mut conn).unwrap();
        let idx = TombstoneIndex::from_commit_log(&conn).unwrap();

        assert!(idx.is_tombstoned(tenant, "mem_a").await);
        assert!(idx.is_tombstoned(tenant, "mem_b").await);
        assert!(!idx.is_tombstoned(tenant, "mem_c").await);
        assert_eq!(idx.tenant_size(tenant), 2);
    }

    #[tokio::test]
    async fn from_commit_log_handles_empty_log() {
        let tmp = tempfile::TempDir::new().unwrap();
        let db_path = tmp.path().join("commit.db");
        let committer = LocalSqliteCommitter::open(&db_path).unwrap();
        // Commit nothing. Just bring up the DB so migrations run.
        drop(committer);

        let mut conn = rusqlite::Connection::open(&db_path).unwrap();
        MigrationRunner::run_pending(&mut conn).unwrap();
        let idx = TombstoneIndex::from_commit_log(&conn).unwrap();
        assert_eq!(idx.total_size(), 0);
        assert_eq!(idx.tenant_count(), 0);
    }

    #[tokio::test]
    async fn from_commit_log_skips_non_tombstone_entries() {
        // Hydration must NOT pick up Upsert/Update entries — only
        // Tombstone (and Purge for cancellation).
        let tmp = tempfile::TempDir::new().unwrap();
        let db_path = tmp.path().join("commit.db");
        let committer = LocalSqliteCommitter::open(&db_path).unwrap();
        committer
            .commit(TenantId::new(1), upsert("mem_a"), CommitOptions::default())
            .await
            .unwrap();
        committer
            .commit(TenantId::new(1), upsert("mem_b"), CommitOptions::default())
            .await
            .unwrap();
        drop(committer);

        let mut conn = rusqlite::Connection::open(&db_path).unwrap();
        MigrationRunner::run_pending(&mut conn).unwrap();
        let idx = TombstoneIndex::from_commit_log(&conn).unwrap();
        assert_eq!(idx.total_size(), 0);
        assert!(!idx.is_tombstoned(TenantId::new(1), "mem_a").await);
    }

    #[tokio::test]
    async fn from_commit_log_isolates_per_tenant() {
        let tmp = tempfile::TempDir::new().unwrap();
        let db_path = tmp.path().join("commit.db");
        let committer = LocalSqliteCommitter::open(&db_path).unwrap();
        committer
            .commit(
                TenantId::new(1),
                tombstone("mem_a"),
                CommitOptions::default(),
            )
            .await
            .unwrap();
        committer
            .commit(
                TenantId::new(2),
                tombstone("mem_b"),
                CommitOptions::default(),
            )
            .await
            .unwrap();
        drop(committer);

        let mut conn = rusqlite::Connection::open(&db_path).unwrap();
        MigrationRunner::run_pending(&mut conn).unwrap();
        let idx = TombstoneIndex::from_commit_log(&conn).unwrap();
        assert!(idx.is_tombstoned(TenantId::new(1), "mem_a").await);
        assert!(!idx.is_tombstoned(TenantId::new(1), "mem_b").await);
        assert!(idx.is_tombstoned(TenantId::new(2), "mem_b").await);
        assert!(!idx.is_tombstoned(TenantId::new(2), "mem_a").await);
        assert_eq!(idx.tenant_count(), 2);
    }

    #[tokio::test]
    async fn dyn_dispatch_works() {
        // Index must work behind `Arc<dyn TombstoneProvider>` so
        // TombstoneAwareCache can hold it without knowing the concrete type.
        let idx = TombstoneIndex::new();
        idx.record(TenantId::new(1), "mem_a".into());
        let dyn_provider: Arc<dyn TombstoneProvider> = Arc::new(idx);
        assert!(dyn_provider.is_tombstoned(TenantId::new(1), "mem_a").await);
        assert!(!dyn_provider.is_tombstoned(TenantId::new(1), "mem_b").await);

        // Behind dyn, generic helpers still work.
        let rids = vec!["mem_a".to_string()];
        assert!(dyn_provider.any_tombstoned(TenantId::new(1), &rids).await);
    }

    #[tokio::test]
    async fn purge_after_tombstone_via_log_clears_index() {
        // Hydration should reflect: tombstone then purge → no entry.
        let tmp = tempfile::TempDir::new().unwrap();
        let db_path = tmp.path().join("commit.db");
        let committer = LocalSqliteCommitter::open(&db_path).unwrap();

        // Tombstone then purge by writing the rows directly. PurgeMemory
        // is grammar-level only at this point — the committer apply path
        // will reject it (RFC 011 PR-3 ships the apply path). We bypass
        // by inserting straight via SQL so the hydration logic is what's
        // under test.
        let tenant = TenantId::new(1);
        committer
            .commit(tenant, tombstone("mem_a"), CommitOptions::default())
            .await
            .unwrap();
        drop(committer);

        // Manually insert a PurgeMemory entry so hydration can see the
        // cancellation. This simulates the future RFC 011 PR-3 apply path
        // having committed it.
        let conn = rusqlite::Connection::open(&db_path).unwrap();
        let purge = MemoryMutation::PurgeMemory {
            rid: "mem_a".into(),
            purge_epoch: 1,
        };
        let payload = serde_json::to_vec(&purge).unwrap();
        let op_id = OpId::new_random();
        conn.execute(
            "INSERT INTO memory_commit_log (
                tenant_id, log_index, term, op_id, op_kind, payload,
                wire_version_major, wire_version_minor,
                schema_table, schema_version,
                committed_at_unix_micros, applied_at_unix_micros
             ) VALUES (?1, ?2, 0, ?3, 'PurgeMemory', ?4, 1, 0, 'memory_commit_log', 1, 0, NULL)",
            rusqlite::params![tenant.0, 99_i64, op_id.to_string(), payload],
        )
        .unwrap();
        drop(conn);

        // Hydrate and verify Tombstone followed by Purge leaves the rid
        // out of the index.
        let mut conn = rusqlite::Connection::open(&db_path).unwrap();
        MigrationRunner::run_pending(&mut conn).unwrap();
        let idx = TombstoneIndex::from_commit_log(&conn).unwrap();
        assert!(!idx.is_tombstoned(tenant, "mem_a").await);
        assert_eq!(idx.tenant_size(tenant), 0);
    }
}