zeph-memory 0.19.1

Semantic memory with SQLite and Qdrant for Zeph agent
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Memory snapshot export and import.
//!
//! A [`MemorySnapshot`] serializes the full set of conversations, messages, and
//! summaries from `SQLite` into a JSON document suitable for backup or migration.
//!
//! Import is idempotent: conversations and messages already present in the target
//! database are skipped and counted in [`ImportStats::skipped`].

use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
use zeph_db::sql;

use zeph_db::ActiveDialect;

use crate::error::MemoryError;
use crate::store::SqliteStore;
use crate::types::ConversationId;

/// Complete serializable snapshot of the `SQLite` memory store.
#[derive(Debug, Serialize, Deserialize)]
pub struct MemorySnapshot {
    /// Schema version for forward-compatibility checks.
    pub version: u32,
    /// RFC 3339 timestamp when the snapshot was created.
    pub exported_at: String,
    /// All conversations with their messages and summaries.
    pub conversations: Vec<ConversationSnapshot>,
}

/// One conversation and all its associated data in a [`MemorySnapshot`].
#[derive(Debug, Serialize, Deserialize)]
pub struct ConversationSnapshot {
    /// `SQLite` row ID of the conversation.
    pub id: i64,
    /// All messages in this conversation.
    pub messages: Vec<MessageSnapshot>,
    /// Compression summaries for this conversation.
    pub summaries: Vec<SummarySnapshot>,
}

/// A single message as stored in a [`MemorySnapshot`].
#[derive(Debug, Serialize, Deserialize)]
pub struct MessageSnapshot {
    /// `SQLite` row ID.
    pub id: i64,
    /// Parent conversation ID.
    pub conversation_id: i64,
    /// Message role (`"user"`, `"assistant"`, `"system"`).
    pub role: String,
    /// Flattened text content.
    pub content: String,
    /// JSON-encoded `Vec<MessagePart>` payload.
    pub parts_json: String,
    /// Unix timestamp (seconds since epoch).
    pub created_at: i64,
}

/// A compression summary record in a [`MemorySnapshot`].
#[derive(Debug, Serialize, Deserialize)]
pub struct SummarySnapshot {
    /// `SQLite` row ID.
    pub id: i64,
    /// Parent conversation ID.
    pub conversation_id: i64,
    /// Summary text.
    pub content: String,
    /// Inclusive lower bound of the summarised message range.
    pub first_message_id: Option<i64>,
    /// Inclusive upper bound of the summarised message range.
    pub last_message_id: Option<i64>,
    /// Estimated token count of the summary content.
    pub token_estimate: i64,
}

/// Counters returned by [`import_snapshot`].
#[derive(Debug, Default)]
pub struct ImportStats {
    /// Number of conversations inserted.
    pub conversations_imported: usize,
    /// Number of messages inserted.
    pub messages_imported: usize,
    /// Number of summaries inserted.
    pub summaries_imported: usize,
    /// Number of rows skipped because they already existed.
    pub skipped: usize,
}

/// Export all conversations, messages and summaries from `SQLite` into a snapshot.
///
/// # Errors
///
/// Returns an error if any database query fails.
pub async fn export_snapshot(sqlite: &SqliteStore) -> Result<MemorySnapshot, MemoryError> {
    let conv_ids: Vec<(i64,)> =
        zeph_db::query_as(sql!("SELECT id FROM conversations ORDER BY id ASC"))
            .fetch_all(sqlite.pool())
            .await?;

    let exported_at = chrono_now();
    let mut conversations = Vec::with_capacity(conv_ids.len());

    for (cid_raw,) in conv_ids {
        let cid = ConversationId(cid_raw);

        let epoch_expr = <ActiveDialect as zeph_db::dialect::Dialect>::epoch_from_col("created_at");
        let msg_sql = zeph_db::rewrite_placeholders(&format!(
            "SELECT id, role, content, parts, {epoch_expr} \
            FROM messages WHERE conversation_id = ? ORDER BY id ASC"
        ));
        let msg_rows: Vec<(i64, String, String, String, i64)> = zeph_db::query_as(&msg_sql)
            .bind(cid)
            .fetch_all(sqlite.pool())
            .await?;

        let messages = msg_rows
            .into_iter()
            .map(
                |(id, role, content, parts_json, created_at)| MessageSnapshot {
                    id,
                    conversation_id: cid_raw,
                    role,
                    content,
                    parts_json,
                    created_at,
                },
            )
            .collect();

        let sum_rows = sqlite.load_summaries(cid).await?;
        let summaries = sum_rows
            .into_iter()
            .map(
                |(
                    id,
                    conversation_id,
                    content,
                    first_message_id,
                    last_message_id,
                    token_estimate,
                )| {
                    SummarySnapshot {
                        id,
                        conversation_id: conversation_id.0,
                        content,
                        first_message_id: first_message_id.map(|m| m.0),
                        last_message_id: last_message_id.map(|m| m.0),
                        token_estimate,
                    }
                },
            )
            .collect();

        conversations.push(ConversationSnapshot {
            id: cid_raw,
            messages,
            summaries,
        });
    }

    Ok(MemorySnapshot {
        version: 1,
        exported_at,
        conversations,
    })
}

/// Import a snapshot into `SQLite`, skipping duplicate entries.
///
/// Returns stats about what was imported.
///
/// # Errors
///
/// Returns an error if any database operation fails.
pub async fn import_snapshot(
    sqlite: &SqliteStore,
    snapshot: MemorySnapshot,
) -> Result<ImportStats, MemoryError> {
    if snapshot.version != 1 {
        return Err(MemoryError::Snapshot(format!(
            "unsupported snapshot version {}: only version 1 is supported",
            snapshot.version
        )));
    }
    let mut stats = ImportStats::default();

    for conv in snapshot.conversations {
        let exists: Option<(i64,)> =
            zeph_db::query_as(sql!("SELECT id FROM conversations WHERE id = ?"))
                .bind(conv.id)
                .fetch_optional(sqlite.pool())
                .await?;

        if exists.is_none() {
            zeph_db::query(sql!("INSERT INTO conversations (id) VALUES (?)"))
                .bind(conv.id)
                .execute(sqlite.pool())
                .await?;
            stats.conversations_imported += 1;
        } else {
            stats.skipped += 1;
        }

        for msg in conv.messages {
            let msg_sql = format!(
                "{} INTO messages (id, conversation_id, role, content, parts) VALUES (?, ?, ?, ?, ?){}",
                <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
                <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
            );
            let result = zeph_db::query(&msg_sql)
                .bind(msg.id)
                .bind(msg.conversation_id)
                .bind(&msg.role)
                .bind(&msg.content)
                .bind(&msg.parts_json)
                .execute(sqlite.pool())
                .await?;

            if result.rows_affected() > 0 {
                stats.messages_imported += 1;
            } else {
                stats.skipped += 1;
            }
        }

        for sum in conv.summaries {
            let sum_sql = format!(
                "{} INTO summaries (id, conversation_id, content, first_message_id, last_message_id, token_estimate) VALUES (?, ?, ?, ?, ?, ?){}",
                <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
                <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
            );
            let result = zeph_db::query(&sum_sql)
                .bind(sum.id)
                .bind(sum.conversation_id)
                .bind(&sum.content)
                .bind(sum.first_message_id)
                .bind(sum.last_message_id)
                .bind(sum.token_estimate)
                .execute(sqlite.pool())
                .await?;

            if result.rows_affected() > 0 {
                stats.summaries_imported += 1;
            } else {
                stats.skipped += 1;
            }
        }
    }

    Ok(stats)
}

fn chrono_now() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let secs = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs();
    // Format as ISO-8601 approximation without chrono dependency
    let (year, month, day, hour, min, sec) = unix_to_parts(secs);
    format!("{year:04}-{month:02}-{day:02}T{hour:02}:{min:02}:{sec:02}Z")
}

fn unix_to_parts(secs: u64) -> (u64, u64, u64, u64, u64, u64) {
    let sec = secs % 60;
    let total_mins = secs / 60;
    let min = total_mins % 60;
    let total_hours = total_mins / 60;
    let hour = total_hours % 24;
    let total_days = total_hours / 24;

    // Gregorian calendar calculation (civil date from days since Unix epoch)
    let adjusted = total_days + 719_468;
    let era = adjusted / 146_097;
    let doe = adjusted - era * 146_097;
    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
    let year = yoe + era * 400;
    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
    let mp = (5 * doy + 2) / 153;
    let day = doy - (153 * mp + 2) / 5 + 1;
    let month = if mp < 10 { mp + 3 } else { mp - 9 };
    let year = if month <= 2 { year + 1 } else { year };
    (year, month, day, hour, min, sec)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn export_empty_database() {
        let store = SqliteStore::new(":memory:").await.unwrap();
        let snapshot = export_snapshot(&store).await.unwrap();
        assert_eq!(snapshot.version, 1);
        assert!(snapshot.conversations.is_empty());
        assert!(!snapshot.exported_at.is_empty());
    }

    #[tokio::test]
    async fn export_import_roundtrip() {
        let src = SqliteStore::new(":memory:").await.unwrap();
        let cid = src.create_conversation().await.unwrap();
        src.save_message(cid, "user", "hello export").await.unwrap();
        src.save_message(cid, "assistant", "hi import")
            .await
            .unwrap();

        let snapshot = export_snapshot(&src).await.unwrap();
        assert_eq!(snapshot.conversations.len(), 1);
        assert_eq!(snapshot.conversations[0].messages.len(), 2);

        let dst = SqliteStore::new(":memory:").await.unwrap();
        let stats = import_snapshot(&dst, snapshot).await.unwrap();
        assert_eq!(stats.conversations_imported, 1);
        assert_eq!(stats.messages_imported, 2);
        assert_eq!(stats.skipped, 0);

        let history = dst.load_history(cid, 50).await.unwrap();
        assert_eq!(history.len(), 2);
        assert_eq!(history[0].content, "hello export");
        assert_eq!(history[1].content, "hi import");
    }

    #[tokio::test]
    async fn import_duplicate_skips() {
        let src = SqliteStore::new(":memory:").await.unwrap();
        let cid = src.create_conversation().await.unwrap();
        src.save_message(cid, "user", "msg").await.unwrap();

        let snapshot = export_snapshot(&src).await.unwrap();

        let dst = SqliteStore::new(":memory:").await.unwrap();
        let stats1 = import_snapshot(&dst, snapshot).await.unwrap();
        assert_eq!(stats1.messages_imported, 1);

        let snapshot2 = export_snapshot(&src).await.unwrap();
        let stats2 = import_snapshot(&dst, snapshot2).await.unwrap();
        assert_eq!(stats2.messages_imported, 0);
        assert!(stats2.skipped > 0);
    }

    #[tokio::test]
    async fn import_existing_conversation_increments_skipped_not_imported() {
        let src = SqliteStore::new(":memory:").await.unwrap();
        let cid = src.create_conversation().await.unwrap();
        src.save_message(cid, "user", "only message").await.unwrap();

        let snapshot = export_snapshot(&src).await.unwrap();

        // Import once — conversation is new.
        let dst = SqliteStore::new(":memory:").await.unwrap();
        let stats1 = import_snapshot(&dst, snapshot).await.unwrap();
        assert_eq!(stats1.conversations_imported, 1);
        assert_eq!(stats1.messages_imported, 1);

        // Import again with no new messages — conversation already exists, must be counted as skipped.
        let snapshot2 = export_snapshot(&src).await.unwrap();
        let stats2 = import_snapshot(&dst, snapshot2).await.unwrap();
        assert_eq!(
            stats2.conversations_imported, 0,
            "existing conversation must not be counted as imported"
        );
        // The conversation itself contributes one skipped, plus the duplicate message.
        assert!(
            stats2.skipped >= 1,
            "re-importing an existing conversation must increment skipped"
        );
    }

    #[tokio::test]
    async fn export_includes_summaries() {
        let store = SqliteStore::new(":memory:").await.unwrap();
        let cid = store.create_conversation().await.unwrap();
        let m1 = store.save_message(cid, "user", "a").await.unwrap();
        let m2 = store.save_message(cid, "assistant", "b").await.unwrap();
        store
            .save_summary(cid, "summary", Some(m1), Some(m2), 5)
            .await
            .unwrap();

        let snapshot = export_snapshot(&store).await.unwrap();
        assert_eq!(snapshot.conversations[0].summaries.len(), 1);
        assert_eq!(snapshot.conversations[0].summaries[0].content, "summary");
    }

    #[test]
    fn chrono_now_not_empty() {
        let ts = chrono_now();
        assert!(ts.contains('T'));
        assert!(ts.ends_with('Z'));
    }

    #[test]
    fn import_corrupt_json_returns_error() {
        let result = serde_json::from_str::<MemorySnapshot>("not valid json at all {{{");
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn import_unsupported_version_returns_error() {
        let store = SqliteStore::new(":memory:").await.unwrap();
        let snapshot = MemorySnapshot {
            version: 99,
            exported_at: "2026-01-01T00:00:00Z".into(),
            conversations: vec![],
        };
        let err = import_snapshot(&store, snapshot).await.unwrap_err();
        let msg = err.to_string();
        assert!(msg.contains("unsupported snapshot version 99"));
    }

    #[tokio::test]
    async fn import_partial_overlap_adds_new_messages() {
        let src = SqliteStore::new(":memory:").await.unwrap();
        let cid = src.create_conversation().await.unwrap();
        src.save_message(cid, "user", "existing message")
            .await
            .unwrap();

        let snapshot1 = export_snapshot(&src).await.unwrap();

        let dst = SqliteStore::new(":memory:").await.unwrap();
        let stats1 = import_snapshot(&dst, snapshot1).await.unwrap();
        assert_eq!(stats1.messages_imported, 1);

        src.save_message(cid, "assistant", "new reply")
            .await
            .unwrap();
        let snapshot2 = export_snapshot(&src).await.unwrap();
        let stats2 = import_snapshot(&dst, snapshot2).await.unwrap();

        assert_eq!(
            stats2.messages_imported, 1,
            "only the new message should be imported"
        );
        // skipped includes the existing conversation (1) plus the duplicate message (1).
        assert_eq!(
            stats2.skipped, 2,
            "existing conversation and duplicate message should be skipped"
        );

        let history = dst.load_history(cid, 50).await.unwrap();
        assert_eq!(history.len(), 2);
        assert_eq!(history[1].content, "new reply");
    }
}