Skip to main content

pi/
session_sqlite.rs

1use crate::error::{Error, Result};
2use crate::session::{SessionEntry, SessionHeader};
3use crate::session_metrics;
4use sqlmodel_core::{Error as SqliteError, Row as SqliteRow, Value as SqliteValue};
5use sqlmodel_sqlite::{OpenFlags, SqliteConfig, SqliteConnection};
6use std::fmt::Write as _;
7use std::path::{Path, PathBuf};
8
9const INIT_SQL: &str = r"
10PRAGMA journal_mode = WAL;
11PRAGMA synchronous = NORMAL;
12PRAGMA foreign_keys = ON;
13
14CREATE TABLE IF NOT EXISTS pi_session_header (
15  id TEXT PRIMARY KEY,
16  json TEXT NOT NULL
17);
18
19CREATE TABLE IF NOT EXISTS pi_session_entries (
20  seq INTEGER PRIMARY KEY,
21  json TEXT NOT NULL
22);
23
24CREATE TABLE IF NOT EXISTS pi_session_meta (
25  key TEXT PRIMARY KEY,
26  value TEXT NOT NULL
27);
28";
29
30#[derive(Debug, Clone)]
31pub struct SqliteSessionMeta {
32    pub header: SessionHeader,
33    pub message_count: u64,
34    pub name: Option<String>,
35}
36
37fn map_sqlite_result<T>(result: std::result::Result<T, SqliteError>) -> Result<T> {
38    result.map_err(|err| Error::session(format!("SQLite session error: {err}")))
39}
40
41fn open_sqlite_connection_read_only(path: &Path) -> Result<SqliteConnection> {
42    let config = SqliteConfig::file(path.to_string_lossy()).flags(OpenFlags::read_only());
43    map_sqlite_result(SqliteConnection::open(&config))
44}
45
46fn open_sqlite_connection_read_write(path: &Path) -> Result<SqliteConnection> {
47    let config = SqliteConfig::file(path.to_string_lossy()).flags(OpenFlags::create_read_write());
48    map_sqlite_result(SqliteConnection::open(&config))
49}
50
51fn row_get_string(row: &SqliteRow, column: &str) -> Result<String> {
52    row.get_named::<String>(column)
53        .map_err(|err| Error::session(format!("SQLite row read failed: {err}")))
54}
55
56fn rollback_quietly(conn: &SqliteConnection) {
57    let _ = conn.execute_raw("ROLLBACK");
58}
59
60fn sqlite_artifact_paths(path: &Path) -> [PathBuf; 3] {
61    [
62        path.to_path_buf(),
63        append_sidecar_suffix(path, "-wal"),
64        append_sidecar_suffix(path, "-shm"),
65    ]
66}
67
68fn append_sidecar_suffix(path: &Path, suffix: &str) -> PathBuf {
69    let mut sidecar = path.as_os_str().to_os_string();
70    sidecar.push(suffix);
71    PathBuf::from(sidecar)
72}
73
74#[cfg(unix)]
75fn set_private_permissions_if_present(path: &Path) -> Result<()> {
76    use std::os::unix::fs::PermissionsExt;
77
78    if path.try_exists().map_err(|err| Error::Io(Box::new(err)))? {
79        std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).map_err(|err| {
80            Error::session(format!(
81                "Failed to secure SQLite session artifact {}: {err}",
82                path.display()
83            ))
84        })?;
85    }
86    Ok(())
87}
88
89#[cfg(not(unix))]
90fn set_private_permissions_if_present(_path: &Path) -> Result<()> {
91    Ok(())
92}
93
94fn ensure_private_sqlite_permissions(path: &Path) -> Result<()> {
95    for artifact in sqlite_artifact_paths(path) {
96        set_private_permissions_if_present(&artifact)?;
97    }
98    Ok(())
99}
100
101fn read_all_entries(conn: &SqliteConnection) -> Result<Vec<SessionEntry>> {
102    let entry_rows = map_sqlite_result(
103        conn.query_sync("SELECT json FROM pi_session_entries ORDER BY seq ASC", &[]),
104    )?;
105
106    let mut entries = Vec::with_capacity(entry_rows.len());
107    for row in entry_rows {
108        let json = row_get_string(&row, "json")?;
109        let entry: SessionEntry = serde_json::from_str(&json).map_err(|err| {
110            Error::session(format!(
111                "Failed to parse session entry: {err}\nJSON: {json}"
112            ))
113        })?;
114        entries.push(entry);
115    }
116    Ok(entries)
117}
118
119fn is_missing_meta_table_error(err: &SqliteError) -> bool {
120    err.to_string().contains("no such table: pi_session_meta")
121}
122
123fn query_session_meta_rows(conn: &SqliteConnection) -> Result<Vec<SqliteRow>> {
124    match conn.query_sync(
125        "SELECT key,value FROM pi_session_meta WHERE key IN ('message_count','name')",
126        &[],
127    ) {
128        Ok(rows) => Ok(rows),
129        Err(err) if is_missing_meta_table_error(&err) => Ok(Vec::new()),
130        Err(err) => Err(Error::session(format!(
131            "SQLite session meta query failed: {err}"
132        ))),
133    }
134}
135
136fn compute_message_count_and_name(entries: &[SessionEntry]) -> (u64, Option<String>) {
137    let mut message_count = 0u64;
138    let mut name = None;
139
140    for entry in entries {
141        match entry {
142            SessionEntry::Message(_) => message_count += 1,
143            SessionEntry::SessionInfo(info) if info.name.is_some() => {
144                name.clone_from(&info.name);
145            }
146            _ => {}
147        }
148    }
149
150    (message_count, name)
151}
152
153pub async fn load_session(path: &Path) -> Result<(SessionHeader, Vec<SessionEntry>)> {
154    let metrics = session_metrics::global();
155    let _timer = metrics.start_timer(&metrics.sqlite_load);
156
157    if !path.exists() {
158        return Err(Error::SessionNotFound {
159            path: path.display().to_string(),
160        });
161    }
162
163    let conn = open_sqlite_connection_read_only(path)?;
164
165    let header_row =
166        map_sqlite_result(conn.query_sync("SELECT json FROM pi_session_header LIMIT 1", &[]))?
167            .into_iter()
168            .next()
169            .ok_or_else(|| Error::session("SQLite session missing header row"))?;
170    let header_json = row_get_string(&header_row, "json")?;
171    let header: SessionHeader = serde_json::from_str(&header_json).map_err(|err| {
172        Error::session(format!(
173            "Failed to parse session header: {err}\nJSON: {header_json}"
174        ))
175    })?;
176    header
177        .validate()
178        .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
179
180    let entries = read_all_entries(&conn)?;
181
182    Ok((header, entries))
183}
184
185pub async fn load_session_meta(path: &Path) -> Result<SqliteSessionMeta> {
186    let metrics = session_metrics::global();
187    let _timer = metrics.start_timer(&metrics.sqlite_load_meta);
188
189    if !path.exists() {
190        return Err(Error::SessionNotFound {
191            path: path.display().to_string(),
192        });
193    }
194
195    let conn = open_sqlite_connection_read_only(path)?;
196
197    let header_row =
198        map_sqlite_result(conn.query_sync("SELECT json FROM pi_session_header LIMIT 1", &[]))?
199            .into_iter()
200            .next()
201            .ok_or_else(|| Error::session("SQLite session missing header row"))?;
202    let header_json = row_get_string(&header_row, "json")?;
203    let header: SessionHeader = serde_json::from_str(&header_json).map_err(|err| {
204        Error::session(format!(
205            "Failed to parse session header: {err}\nJSON: {header_json}"
206        ))
207    })?;
208    header
209        .validate()
210        .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
211
212    let meta_rows = query_session_meta_rows(&conn)?;
213
214    let mut message_count: Option<u64> = None;
215    let mut name: Option<String> = None;
216    for row in meta_rows {
217        let key = row_get_string(&row, "key")?;
218        let value = row_get_string(&row, "value")?;
219        match key.as_str() {
220            "message_count" => message_count = value.parse::<u64>().ok(),
221            "name" if !value.is_empty() => {
222                name = Some(value);
223            }
224            _ => {}
225        }
226    }
227
228    if message_count.is_none() || name.is_none() {
229        let entries = read_all_entries(&conn)?;
230        let (fallback_message_count, fallback_name) = compute_message_count_and_name(&entries);
231        if message_count.is_none() {
232            message_count = Some(fallback_message_count);
233        }
234        if name.is_none() {
235            name = fallback_name;
236        }
237    }
238
239    Ok(SqliteSessionMeta {
240        header,
241        message_count: message_count.unwrap_or(0),
242        name,
243    })
244}
245
246#[cfg(test)]
247#[allow(clippy::items_after_test_module)]
248mod tests {
249    use super::*;
250    use crate::model::UserContent;
251    use crate::session::{EntryBase, MessageEntry, SessionInfoEntry, SessionMessage};
252
253    fn dummy_base() -> EntryBase {
254        EntryBase {
255            id: Some("test-id".to_string()),
256            parent_id: None,
257            timestamp: "2026-01-01T00:00:00.000Z".to_string(),
258        }
259    }
260
261    fn message_entry() -> SessionEntry {
262        SessionEntry::Message(MessageEntry {
263            base: dummy_base(),
264            message: SessionMessage::User {
265                content: UserContent::Text("hello".to_string()),
266                timestamp: None,
267            },
268        })
269    }
270
271    fn session_info_entry(name: Option<String>) -> SessionEntry {
272        SessionEntry::SessionInfo(SessionInfoEntry {
273            base: dummy_base(),
274            name,
275        })
276    }
277
278    #[test]
279    fn compute_counts_empty() {
280        let (count, name) = compute_message_count_and_name(&[]);
281        assert_eq!(count, 0);
282        assert!(name.is_none());
283    }
284
285    #[test]
286    fn compute_counts_messages_only() {
287        let entries = vec![message_entry(), message_entry(), message_entry()];
288        let (count, name) = compute_message_count_and_name(&entries);
289        assert_eq!(count, 3);
290        assert!(name.is_none());
291    }
292
293    #[test]
294    fn compute_counts_session_info_with_name() {
295        let entries = vec![
296            message_entry(),
297            session_info_entry(Some("My Session".to_string())),
298            message_entry(),
299        ];
300        let (count, name) = compute_message_count_and_name(&entries);
301        assert_eq!(count, 2);
302        assert_eq!(name, Some("My Session".to_string()));
303    }
304
305    #[test]
306    fn compute_counts_session_info_none_name_ignored() {
307        let entries = vec![
308            session_info_entry(Some("First".to_string())),
309            session_info_entry(None),
310            message_entry(),
311        ];
312        let (count, name) = compute_message_count_and_name(&entries);
313        assert_eq!(count, 1);
314        // The second SessionInfo has name=None, so it doesn't overwrite.
315        assert_eq!(name, Some("First".to_string()));
316    }
317
318    #[test]
319    fn compute_counts_latest_name_wins() {
320        let entries = vec![
321            session_info_entry(Some("First".to_string())),
322            session_info_entry(Some("Second".to_string())),
323        ];
324        let (_, name) = compute_message_count_and_name(&entries);
325        assert_eq!(name, Some("Second".to_string()));
326    }
327
328    // -- Non-message / non-session-info entries are ignored --
329
330    #[test]
331    fn compute_counts_ignores_model_change_entries() {
332        use crate::session::ModelChangeEntry;
333        let entries = vec![
334            message_entry(),
335            SessionEntry::ModelChange(ModelChangeEntry {
336                base: dummy_base(),
337                provider: "anthropic".to_string(),
338                model_id: "claude-sonnet-4-5".to_string(),
339            }),
340            message_entry(),
341        ];
342        let (count, name) = compute_message_count_and_name(&entries);
343        assert_eq!(count, 2);
344        assert!(name.is_none());
345    }
346
347    #[test]
348    fn compute_counts_ignores_label_entries() {
349        use crate::session::LabelEntry;
350        let entries = vec![
351            message_entry(),
352            SessionEntry::Label(LabelEntry {
353                base: dummy_base(),
354                target_id: "some-id".to_string(),
355                label: Some("important".to_string()),
356            }),
357        ];
358        let (count, name) = compute_message_count_and_name(&entries);
359        assert_eq!(count, 1);
360        assert!(name.is_none());
361    }
362
363    #[test]
364    fn compute_counts_ignores_custom_entries() {
365        use crate::session::CustomEntry;
366        let entries = vec![
367            SessionEntry::Custom(CustomEntry {
368                base: dummy_base(),
369                custom_type: "my_custom".to_string(),
370                data: Some(serde_json::json!({"key": "value"})),
371            }),
372            message_entry(),
373        ];
374        let (count, name) = compute_message_count_and_name(&entries);
375        assert_eq!(count, 1);
376        assert!(name.is_none());
377    }
378
379    #[test]
380    fn compute_counts_ignores_compaction_entries() {
381        use crate::session::CompactionEntry;
382        let entries = vec![
383            message_entry(),
384            SessionEntry::Compaction(CompactionEntry {
385                base: dummy_base(),
386                summary: "summary text".to_string(),
387                first_kept_entry_id: "e1".to_string(),
388                tokens_before: 500,
389                details: None,
390                from_hook: None,
391            }),
392            message_entry(),
393            message_entry(),
394        ];
395        let (count, name) = compute_message_count_and_name(&entries);
396        assert_eq!(count, 3);
397        assert!(name.is_none());
398    }
399
400    #[test]
401    fn compute_counts_mixed_entry_types() {
402        use crate::session::{CompactionEntry, CustomEntry, LabelEntry, ModelChangeEntry};
403        let entries = vec![
404            message_entry(),
405            SessionEntry::ModelChange(ModelChangeEntry {
406                base: dummy_base(),
407                provider: "openai".to_string(),
408                model_id: "gpt-4".to_string(),
409            }),
410            session_info_entry(Some("Named".to_string())),
411            SessionEntry::Label(LabelEntry {
412                base: dummy_base(),
413                target_id: "t1".to_string(),
414                label: None,
415            }),
416            message_entry(),
417            SessionEntry::Compaction(CompactionEntry {
418                base: dummy_base(),
419                summary: "s".to_string(),
420                first_kept_entry_id: "e1".to_string(),
421                tokens_before: 100,
422                details: None,
423                from_hook: None,
424            }),
425            SessionEntry::Custom(CustomEntry {
426                base: dummy_base(),
427                custom_type: "ct".to_string(),
428                data: None,
429            }),
430            message_entry(),
431        ];
432        let (count, name) = compute_message_count_and_name(&entries);
433        assert_eq!(count, 3);
434        assert_eq!(name, Some("Named".to_string()));
435    }
436
437    // -- map_sqlite_result tests --
438
439    #[test]
440    fn map_sqlite_result_ok() {
441        let result = map_sqlite_result::<i32>(Ok(42));
442        assert_eq!(result.unwrap(), 42);
443    }
444
445    #[test]
446    fn map_sqlite_result_err() {
447        let config = SqliteConfig::file("bad\0path").flags(OpenFlags::create_read_write());
448        let result = map_sqlite_result::<i32>(SqliteConnection::open(&config).map(|_| 42));
449        let err = result.unwrap_err();
450        match err {
451            Error::Session(message) => {
452                assert!(message.contains("SQLite session error"));
453            }
454            other => unreachable!("Unexpected error: {:?}", other),
455        }
456    }
457
458    // -- SqliteSessionMeta struct --
459
460    #[test]
461    fn sqlite_session_meta_fields() {
462        let meta = SqliteSessionMeta {
463            header: SessionHeader {
464                id: "test-session".to_string(),
465                ..SessionHeader::default()
466            },
467            message_count: 42,
468            name: Some("My Session".to_string()),
469        };
470        assert_eq!(meta.header.id, "test-session");
471        assert_eq!(meta.message_count, 42);
472        assert_eq!(meta.name.as_deref(), Some("My Session"));
473    }
474
475    #[test]
476    fn sqlite_session_meta_no_name() {
477        let meta = SqliteSessionMeta {
478            header: SessionHeader::default(),
479            message_count: 0,
480            name: None,
481        };
482        assert_eq!(meta.message_count, 0);
483        assert!(meta.name.is_none());
484    }
485
486    // -- compute_message_count_and_name: large input --
487
488    #[test]
489    fn compute_counts_large_message_set() {
490        let entries: Vec<SessionEntry> = (0..1000).map(|_| message_entry()).collect();
491        let (count, name) = compute_message_count_and_name(&entries);
492        assert_eq!(count, 1000);
493        assert!(name.is_none());
494    }
495
496    // -- compute_message_count_and_name: name then messages only --
497
498    #[test]
499    fn compute_counts_name_set_early_persists() {
500        let entries = vec![
501            session_info_entry(Some("Early Name".to_string())),
502            message_entry(),
503            message_entry(),
504            message_entry(),
505        ];
506        let (count, name) = compute_message_count_and_name(&entries);
507        assert_eq!(count, 3);
508        assert_eq!(name, Some("Early Name".to_string()));
509    }
510
511    // -- compute_message_count_and_name: branch summary entry --
512
513    #[test]
514    fn compute_counts_ignores_branch_summary() {
515        use crate::session::BranchSummaryEntry;
516        let entries = vec![
517            message_entry(),
518            SessionEntry::BranchSummary(BranchSummaryEntry {
519                base: dummy_base(),
520                from_id: "parent-id".to_string(),
521                summary: "branch summary".to_string(),
522                details: None,
523                from_hook: None,
524            }),
525        ];
526        let (count, name) = compute_message_count_and_name(&entries);
527        assert_eq!(count, 1);
528        assert!(name.is_none());
529    }
530
531    // -- compute_message_count_and_name: thinking level change --
532
533    #[test]
534    fn compute_counts_ignores_thinking_level_change() {
535        use crate::session::ThinkingLevelChangeEntry;
536        let entries = vec![
537            SessionEntry::ThinkingLevelChange(ThinkingLevelChangeEntry {
538                base: dummy_base(),
539                thinking_level: "high".to_string(),
540            }),
541            message_entry(),
542        ];
543        let (count, name) = compute_message_count_and_name(&entries);
544        assert_eq!(count, 1);
545        assert!(name.is_none());
546    }
547
548    #[test]
549    fn save_session_rejects_semantically_invalid_header() {
550        let dir = tempfile::tempdir().expect("tempdir");
551        let path = dir.path().join("invalid.sqlite");
552        let header = SessionHeader {
553            r#type: "note".to_string(),
554            ..SessionHeader::default()
555        };
556
557        let err = futures::executor::block_on(async { save_session(&path, &header, &[]).await })
558            .expect_err("invalid header should fail");
559        let message = err.to_string();
560        assert!(
561            message.contains("Invalid session header"),
562            "expected invalid session header error, got {message}"
563        );
564    }
565
566    #[test]
567    fn load_session_meta_rejects_semantically_invalid_header() {
568        let dir = tempfile::tempdir().expect("tempdir");
569        let path = dir.path().join("invalid.sqlite");
570        let header = SessionHeader {
571            id: "sqlite-test".to_string(),
572            ..SessionHeader::default()
573        };
574
575        futures::executor::block_on(async { save_session(&path, &header, &[]).await })
576            .expect("save sqlite session");
577
578        let invalid_header = SessionHeader {
579            r#type: "note".to_string(),
580            ..header
581        };
582        let invalid_json =
583            serde_json::to_string(&invalid_header).expect("serialize invalid session header");
584        let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
585            .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
586        let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
587        conn.execute_sync(
588            "UPDATE pi_session_header SET json = ?1",
589            &[sqlmodel_core::Value::Text(invalid_json)],
590        )
591        .expect("corrupt sqlite header row");
592
593        let err = futures::executor::block_on(async { load_session_meta(&path).await })
594            .expect_err("invalid header should fail");
595        let message = err.to_string();
596        assert!(
597            message.contains("Invalid session header"),
598            "expected invalid session header error, got {message}"
599        );
600    }
601
602    #[test]
603    fn load_session_meta_falls_back_to_entries_when_name_row_missing() {
604        let dir = tempfile::tempdir().expect("tempdir");
605        let path = dir.path().join("missing-name-row.sqlite");
606        let header = SessionHeader {
607            id: "sqlite-name-fallback".to_string(),
608            ..SessionHeader::default()
609        };
610        let entries = vec![
611            session_info_entry(Some("Recovered Name".to_string())),
612            message_entry(),
613            message_entry(),
614        ];
615
616        futures::executor::block_on(async { save_session(&path, &header, &entries).await })
617            .expect("save sqlite session");
618
619        let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
620            .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
621        let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
622        conn.execute_sync(
623            "DELETE FROM pi_session_meta WHERE key = ?1",
624            &[SqliteValue::Text("name".to_string())],
625        )
626        .expect("delete name meta row");
627
628        let meta = futures::executor::block_on(async { load_session_meta(&path).await })
629            .expect("load sqlite meta");
630        assert_eq!(meta.message_count, 2);
631        assert_eq!(meta.name.as_deref(), Some("Recovered Name"));
632    }
633
634    #[test]
635    fn load_session_meta_falls_back_when_meta_table_missing() {
636        let dir = tempfile::tempdir().expect("tempdir");
637        let path = dir.path().join("missing-meta-table.sqlite");
638        let header = SessionHeader {
639            id: "sqlite-missing-meta".to_string(),
640            ..SessionHeader::default()
641        };
642        let entries = vec![
643            session_info_entry(Some("Recovered From Entries".to_string())),
644            message_entry(),
645        ];
646
647        futures::executor::block_on(async { save_session(&path, &header, &entries).await })
648            .expect("save sqlite session");
649
650        let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
651            .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
652        let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
653        conn.execute_raw("DROP TABLE pi_session_meta")
654            .expect("drop sqlite meta table");
655
656        let meta = futures::executor::block_on(async { load_session_meta(&path).await })
657            .expect("load sqlite meta");
658        assert_eq!(meta.message_count, 1);
659        assert_eq!(meta.name.as_deref(), Some("Recovered From Entries"));
660    }
661
662    #[test]
663    fn load_session_meta_rejects_invalid_meta_table_schema() {
664        let dir = tempfile::tempdir().expect("tempdir");
665        let path = dir.path().join("invalid-meta-schema.sqlite");
666        let header = SessionHeader {
667            id: "sqlite-invalid-meta-schema".to_string(),
668            ..SessionHeader::default()
669        };
670
671        futures::executor::block_on(async {
672            save_session(&path, &header, &[message_entry()]).await
673        })
674        .expect("save sqlite session");
675
676        let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
677            .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
678        let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
679        conn.execute_raw("DROP TABLE pi_session_meta")
680            .expect("drop sqlite meta table");
681        conn.execute_raw("CREATE TABLE pi_session_meta (key TEXT PRIMARY KEY)")
682            .expect("create invalid sqlite meta table");
683
684        let err = futures::executor::block_on(async { load_session_meta(&path).await })
685            .expect_err("invalid meta schema should fail");
686        let message = err.to_string();
687        assert!(
688            message.contains("SQLite session meta query failed"),
689            "expected meta query error, got {message}"
690        );
691    }
692
693    #[cfg(unix)]
694    #[test]
695    fn load_paths_accept_read_only_sqlite_files() {
696        use std::os::unix::fs::PermissionsExt;
697
698        let dir = tempfile::tempdir().expect("tempdir");
699        let path = dir.path().join("readonly.sqlite");
700        let header = SessionHeader {
701            id: "sqlite-readonly".to_string(),
702            ..SessionHeader::default()
703        };
704        let entries = vec![
705            session_info_entry(Some("Read Only".to_string())),
706            message_entry(),
707        ];
708
709        futures::executor::block_on(async { save_session(&path, &header, &entries).await })
710            .expect("save sqlite session");
711
712        let original_mode = std::fs::metadata(&path)
713            .expect("sqlite metadata")
714            .permissions()
715            .mode();
716        std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o444))
717            .expect("chmod readonly sqlite");
718
719        let (loaded_header, loaded_entries) =
720            futures::executor::block_on(async { load_session(&path).await })
721                .expect("load readonly sqlite session");
722        let meta = futures::executor::block_on(async { load_session_meta(&path).await })
723            .expect("load readonly sqlite meta");
724
725        std::fs::set_permissions(&path, std::fs::Permissions::from_mode(original_mode))
726            .expect("restore sqlite permissions");
727
728        assert_eq!(loaded_header.id, header.id);
729        assert_eq!(loaded_entries.len(), entries.len());
730        assert_eq!(meta.header.id, header.id);
731        assert_eq!(meta.message_count, 1);
732        assert_eq!(meta.name.as_deref(), Some("Read Only"));
733    }
734
735    #[cfg(unix)]
736    #[test]
737    fn save_session_sets_private_permissions_for_sqlite_artifacts() {
738        use std::os::unix::fs::PermissionsExt;
739
740        let dir = tempfile::tempdir().expect("tempdir");
741        let path = dir.path().join("secure.sqlite");
742        let header = SessionHeader {
743            id: "sqlite-secure".to_string(),
744            ..SessionHeader::default()
745        };
746
747        futures::executor::block_on(async {
748            save_session(&path, &header, &[message_entry()]).await
749        })
750        .expect("save sqlite session");
751
752        for artifact in sqlite_artifact_paths(&path) {
753            if artifact.exists() {
754                let mode = std::fs::metadata(&artifact)
755                    .expect("sqlite artifact metadata")
756                    .permissions()
757                    .mode()
758                    & 0o777;
759                assert_eq!(
760                    mode,
761                    0o600,
762                    "expected private permissions for {}",
763                    artifact.display()
764                );
765            }
766        }
767    }
768}
769
770pub async fn save_session(
771    path: &Path,
772    header: &SessionHeader,
773    entries: &[SessionEntry],
774) -> Result<()> {
775    header
776        .validate()
777        .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
778    let metrics = session_metrics::global();
779    let _save_timer = metrics.start_timer(&metrics.sqlite_save);
780
781    if let Some(parent) = path.parent() {
782        asupersync::fs::create_dir_all(parent).await?;
783    }
784
785    let conn = open_sqlite_connection_read_write(path)?;
786    map_sqlite_result(conn.execute_raw(INIT_SQL))?;
787    ensure_private_sqlite_permissions(path)?;
788    map_sqlite_result(conn.execute_raw("BEGIN IMMEDIATE"))?;
789
790    // Serialize header + entries and track serialization time + bytes.
791    let save_result = (|| -> Result<()> {
792        map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_entries", &[]))?;
793        map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_header", &[]))?;
794        map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_meta", &[]))?;
795
796        let serialize_timer = metrics.start_timer(&metrics.sqlite_serialize);
797        let header_json = serde_json::to_string(header)?;
798        let mut total_json_bytes = header_json.len() as u64;
799
800        let mut entry_jsons = Vec::with_capacity(entries.len());
801        for entry in entries {
802            let json = serde_json::to_string(entry)?;
803            total_json_bytes += json.len() as u64;
804            entry_jsons.push(json);
805        }
806        serialize_timer.finish();
807        metrics.record_bytes(&metrics.sqlite_bytes, total_json_bytes);
808
809        map_sqlite_result(conn.execute_sync(
810            "INSERT INTO pi_session_header (id,json) VALUES (?1,?2)",
811            &[
812                SqliteValue::Text(header.id.clone()),
813                SqliteValue::Text(header_json),
814            ],
815        ))?;
816
817        let mut seq = 1_i64;
818        for chunk in entry_jsons.chunks(200) {
819            let mut sql = String::with_capacity(64 + chunk.len() * 16);
820            sql.push_str("INSERT INTO pi_session_entries (seq,json) VALUES ");
821            let mut params = Vec::with_capacity(chunk.len() * 2);
822            for (i, json) in chunk.iter().enumerate() {
823                if i > 0 {
824                    sql.push(',');
825                }
826                let _ = write!(sql, "(?{},?{})", i * 2 + 1, i * 2 + 2);
827                params.push(SqliteValue::BigInt(seq));
828                params.push(SqliteValue::Text(json.clone()));
829                seq += 1;
830            }
831            map_sqlite_result(conn.execute_sync(&sql, &params))?;
832        }
833
834        let (message_count, name) = compute_message_count_and_name(entries);
835        map_sqlite_result(conn.execute_sync(
836            "INSERT INTO pi_session_meta (key,value) VALUES (?1,?2)",
837            &[
838                SqliteValue::Text("message_count".to_string()),
839                SqliteValue::Text(message_count.to_string()),
840            ],
841        ))?;
842        let name_value = name.unwrap_or_default();
843        map_sqlite_result(conn.execute_sync(
844            "INSERT INTO pi_session_meta (key,value) VALUES (?1,?2)",
845            &[
846                SqliteValue::Text("name".to_string()),
847                SqliteValue::Text(name_value),
848            ],
849        ))?;
850
851        Ok(())
852    })();
853
854    match save_result {
855        Ok(()) => {
856            map_sqlite_result(conn.execute_raw("COMMIT"))?;
857            ensure_private_sqlite_permissions(path)?;
858            Ok(())
859        }
860        Err(err) => {
861            rollback_quietly(&conn);
862            Err(err)
863        }
864    }
865}
866
867/// Incrementally append new entries to an existing SQLite session database.
868///
869/// Only the entries in `new_entries` (starting at 1-based sequence `start_seq`)
870/// are inserted. The header row is left unchanged, while the `message_count`
871/// and `name` meta rows are upserted to reflect the current totals.
872///
873/// This avoids the DELETE+reinsert cost of [`save_session`] for the common
874/// case where a few entries are appended between saves.
875pub async fn append_entries(
876    path: &Path,
877    new_entries: &[SessionEntry],
878    start_seq: usize,
879    message_count: u64,
880    session_name: Option<&str>,
881) -> Result<()> {
882    let metrics = session_metrics::global();
883    let _timer = metrics.start_timer(&metrics.sqlite_append);
884
885    let conn = open_sqlite_connection_read_write(path)?;
886
887    // Ensure WAL mode is active and tables exist (especially pi_session_meta for old DBs).
888    map_sqlite_result(conn.execute_raw(INIT_SQL))?;
889    ensure_private_sqlite_permissions(path)?;
890    map_sqlite_result(conn.execute_raw("BEGIN IMMEDIATE"))?;
891
892    let append_result = (|| -> Result<()> {
893        // Serialize and insert only the new entries.
894        let serialize_timer = metrics.start_timer(&metrics.sqlite_serialize);
895        let mut total_json_bytes = 0u64;
896        let mut entry_jsons = Vec::with_capacity(new_entries.len());
897        for entry in new_entries {
898            let json = serde_json::to_string(entry)?;
899            total_json_bytes += json.len() as u64;
900            entry_jsons.push(json);
901        }
902        serialize_timer.finish();
903        metrics.record_bytes(&metrics.sqlite_bytes, total_json_bytes);
904
905        let mut seq = i64::try_from(start_seq)
906            .unwrap_or(i64::MAX.saturating_sub(1))
907            .saturating_add(1);
908        for chunk in entry_jsons.chunks(200) {
909            let mut sql = String::with_capacity(64 + chunk.len() * 16);
910            sql.push_str("INSERT INTO pi_session_entries (seq,json) VALUES ");
911            let mut params = Vec::with_capacity(chunk.len() * 2);
912            for (i, json) in chunk.iter().enumerate() {
913                if i > 0 {
914                    sql.push(',');
915                }
916                let _ = write!(sql, "(?{},?{})", i * 2 + 1, i * 2 + 2);
917                params.push(SqliteValue::BigInt(seq));
918                params.push(SqliteValue::Text(json.clone()));
919                seq += 1;
920            }
921            map_sqlite_result(conn.execute_sync(&sql, &params))?;
922        }
923
924        // Upsert meta counters (INSERT OR REPLACE).
925        map_sqlite_result(conn.execute_sync(
926            "INSERT OR REPLACE INTO pi_session_meta (key,value) VALUES (?1,?2)",
927            &[
928                SqliteValue::Text("message_count".to_string()),
929                SqliteValue::Text(message_count.to_string()),
930            ],
931        ))?;
932        let name_value = session_name.unwrap_or("");
933        map_sqlite_result(conn.execute_sync(
934            "INSERT OR REPLACE INTO pi_session_meta (key,value) VALUES (?1,?2)",
935            &[
936                SqliteValue::Text("name".to_string()),
937                SqliteValue::Text(name_value.to_string()),
938            ],
939        ))?;
940
941        Ok(())
942    })();
943
944    match append_result {
945        Ok(()) => {
946            map_sqlite_result(conn.execute_raw("COMMIT"))?;
947            ensure_private_sqlite_permissions(path)?;
948            Ok(())
949        }
950        Err(err) => {
951            rollback_quietly(&conn);
952            Err(err)
953        }
954    }
955}