1use crate::error::{Error, Result};
2use crate::session::{SessionEntry, SessionHeader};
3use crate::session_metrics;
4use sqlmodel_core::{Error as SqliteError, Row as SqliteRow, Value as SqliteValue};
5use sqlmodel_sqlite::{OpenFlags, SqliteConfig, SqliteConnection};
6use std::fmt::Write as _;
7use std::path::{Path, PathBuf};
8
9const INIT_SQL: &str = r"
10PRAGMA journal_mode = WAL;
11PRAGMA synchronous = NORMAL;
12PRAGMA foreign_keys = ON;
13
14CREATE TABLE IF NOT EXISTS pi_session_header (
15 id TEXT PRIMARY KEY,
16 json TEXT NOT NULL
17);
18
19CREATE TABLE IF NOT EXISTS pi_session_entries (
20 seq INTEGER PRIMARY KEY,
21 json TEXT NOT NULL
22);
23
24CREATE TABLE IF NOT EXISTS pi_session_meta (
25 key TEXT PRIMARY KEY,
26 value TEXT NOT NULL
27);
28";
29
30#[derive(Debug, Clone)]
31pub struct SqliteSessionMeta {
32 pub header: SessionHeader,
33 pub message_count: u64,
34 pub name: Option<String>,
35}
36
37fn map_sqlite_result<T>(result: std::result::Result<T, SqliteError>) -> Result<T> {
38 result.map_err(|err| Error::session(format!("SQLite session error: {err}")))
39}
40
41fn open_sqlite_connection_read_only(path: &Path) -> Result<SqliteConnection> {
42 let config = SqliteConfig::file(path.to_string_lossy()).flags(OpenFlags::read_only());
43 map_sqlite_result(SqliteConnection::open(&config))
44}
45
46fn open_sqlite_connection_read_write(path: &Path) -> Result<SqliteConnection> {
47 let config = SqliteConfig::file(path.to_string_lossy()).flags(OpenFlags::create_read_write());
48 map_sqlite_result(SqliteConnection::open(&config))
49}
50
51fn row_get_string(row: &SqliteRow, column: &str) -> Result<String> {
52 row.get_named::<String>(column)
53 .map_err(|err| Error::session(format!("SQLite row read failed: {err}")))
54}
55
56fn rollback_quietly(conn: &SqliteConnection) {
57 let _ = conn.execute_raw("ROLLBACK");
58}
59
60fn sqlite_artifact_paths(path: &Path) -> [PathBuf; 3] {
61 [
62 path.to_path_buf(),
63 append_sidecar_suffix(path, "-wal"),
64 append_sidecar_suffix(path, "-shm"),
65 ]
66}
67
68fn append_sidecar_suffix(path: &Path, suffix: &str) -> PathBuf {
69 let mut sidecar = path.as_os_str().to_os_string();
70 sidecar.push(suffix);
71 PathBuf::from(sidecar)
72}
73
74#[cfg(unix)]
75fn set_private_permissions_if_present(path: &Path) -> Result<()> {
76 use std::os::unix::fs::PermissionsExt;
77
78 if path.try_exists().map_err(|err| Error::Io(Box::new(err)))? {
79 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).map_err(|err| {
80 Error::session(format!(
81 "Failed to secure SQLite session artifact {}: {err}",
82 path.display()
83 ))
84 })?;
85 }
86 Ok(())
87}
88
89#[cfg(not(unix))]
90fn set_private_permissions_if_present(_path: &Path) -> Result<()> {
91 Ok(())
92}
93
94fn ensure_private_sqlite_permissions(path: &Path) -> Result<()> {
95 for artifact in sqlite_artifact_paths(path) {
96 set_private_permissions_if_present(&artifact)?;
97 }
98 Ok(())
99}
100
101fn read_all_entries(conn: &SqliteConnection) -> Result<Vec<SessionEntry>> {
102 let entry_rows = map_sqlite_result(
103 conn.query_sync("SELECT json FROM pi_session_entries ORDER BY seq ASC", &[]),
104 )?;
105
106 let mut entries = Vec::with_capacity(entry_rows.len());
107 for row in entry_rows {
108 let json = row_get_string(&row, "json")?;
109 let entry: SessionEntry = serde_json::from_str(&json).map_err(|err| {
110 Error::session(format!(
111 "Failed to parse session entry: {err}\nJSON: {json}"
112 ))
113 })?;
114 entries.push(entry);
115 }
116 Ok(entries)
117}
118
119fn is_missing_meta_table_error(err: &SqliteError) -> bool {
120 err.to_string().contains("no such table: pi_session_meta")
121}
122
123fn query_session_meta_rows(conn: &SqliteConnection) -> Result<Vec<SqliteRow>> {
124 match conn.query_sync(
125 "SELECT key,value FROM pi_session_meta WHERE key IN ('message_count','name')",
126 &[],
127 ) {
128 Ok(rows) => Ok(rows),
129 Err(err) if is_missing_meta_table_error(&err) => Ok(Vec::new()),
130 Err(err) => Err(Error::session(format!(
131 "SQLite session meta query failed: {err}"
132 ))),
133 }
134}
135
136fn compute_message_count_and_name(entries: &[SessionEntry]) -> (u64, Option<String>) {
137 let mut message_count = 0u64;
138 let mut name = None;
139
140 for entry in entries {
141 match entry {
142 SessionEntry::Message(_) => message_count += 1,
143 SessionEntry::SessionInfo(info) if info.name.is_some() => {
144 name.clone_from(&info.name);
145 }
146 _ => {}
147 }
148 }
149
150 (message_count, name)
151}
152
153pub async fn load_session(path: &Path) -> Result<(SessionHeader, Vec<SessionEntry>)> {
154 let metrics = session_metrics::global();
155 let _timer = metrics.start_timer(&metrics.sqlite_load);
156
157 if !path.exists() {
158 return Err(Error::SessionNotFound {
159 path: path.display().to_string(),
160 });
161 }
162
163 let conn = open_sqlite_connection_read_only(path)?;
164
165 let header_row =
166 map_sqlite_result(conn.query_sync("SELECT json FROM pi_session_header LIMIT 1", &[]))?
167 .into_iter()
168 .next()
169 .ok_or_else(|| Error::session("SQLite session missing header row"))?;
170 let header_json = row_get_string(&header_row, "json")?;
171 let header: SessionHeader = serde_json::from_str(&header_json).map_err(|err| {
172 Error::session(format!(
173 "Failed to parse session header: {err}\nJSON: {header_json}"
174 ))
175 })?;
176 header
177 .validate()
178 .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
179
180 let entries = read_all_entries(&conn)?;
181
182 Ok((header, entries))
183}
184
185pub async fn load_session_meta(path: &Path) -> Result<SqliteSessionMeta> {
186 let metrics = session_metrics::global();
187 let _timer = metrics.start_timer(&metrics.sqlite_load_meta);
188
189 if !path.exists() {
190 return Err(Error::SessionNotFound {
191 path: path.display().to_string(),
192 });
193 }
194
195 let conn = open_sqlite_connection_read_only(path)?;
196
197 let header_row =
198 map_sqlite_result(conn.query_sync("SELECT json FROM pi_session_header LIMIT 1", &[]))?
199 .into_iter()
200 .next()
201 .ok_or_else(|| Error::session("SQLite session missing header row"))?;
202 let header_json = row_get_string(&header_row, "json")?;
203 let header: SessionHeader = serde_json::from_str(&header_json).map_err(|err| {
204 Error::session(format!(
205 "Failed to parse session header: {err}\nJSON: {header_json}"
206 ))
207 })?;
208 header
209 .validate()
210 .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
211
212 let meta_rows = query_session_meta_rows(&conn)?;
213
214 let mut message_count: Option<u64> = None;
215 let mut name: Option<String> = None;
216 for row in meta_rows {
217 let key = row_get_string(&row, "key")?;
218 let value = row_get_string(&row, "value")?;
219 match key.as_str() {
220 "message_count" => message_count = value.parse::<u64>().ok(),
221 "name" if !value.is_empty() => {
222 name = Some(value);
223 }
224 _ => {}
225 }
226 }
227
228 if message_count.is_none() || name.is_none() {
229 let entries = read_all_entries(&conn)?;
230 let (fallback_message_count, fallback_name) = compute_message_count_and_name(&entries);
231 if message_count.is_none() {
232 message_count = Some(fallback_message_count);
233 }
234 if name.is_none() {
235 name = fallback_name;
236 }
237 }
238
239 Ok(SqliteSessionMeta {
240 header,
241 message_count: message_count.unwrap_or(0),
242 name,
243 })
244}
245
246#[cfg(test)]
247#[allow(clippy::items_after_test_module)]
248mod tests {
249 use super::*;
250 use crate::model::UserContent;
251 use crate::session::{EntryBase, MessageEntry, SessionInfoEntry, SessionMessage};
252
253 fn dummy_base() -> EntryBase {
254 EntryBase {
255 id: Some("test-id".to_string()),
256 parent_id: None,
257 timestamp: "2026-01-01T00:00:00.000Z".to_string(),
258 }
259 }
260
261 fn message_entry() -> SessionEntry {
262 SessionEntry::Message(MessageEntry {
263 base: dummy_base(),
264 message: SessionMessage::User {
265 content: UserContent::Text("hello".to_string()),
266 timestamp: None,
267 },
268 })
269 }
270
271 fn session_info_entry(name: Option<String>) -> SessionEntry {
272 SessionEntry::SessionInfo(SessionInfoEntry {
273 base: dummy_base(),
274 name,
275 })
276 }
277
278 #[test]
279 fn compute_counts_empty() {
280 let (count, name) = compute_message_count_and_name(&[]);
281 assert_eq!(count, 0);
282 assert!(name.is_none());
283 }
284
285 #[test]
286 fn compute_counts_messages_only() {
287 let entries = vec![message_entry(), message_entry(), message_entry()];
288 let (count, name) = compute_message_count_and_name(&entries);
289 assert_eq!(count, 3);
290 assert!(name.is_none());
291 }
292
293 #[test]
294 fn compute_counts_session_info_with_name() {
295 let entries = vec![
296 message_entry(),
297 session_info_entry(Some("My Session".to_string())),
298 message_entry(),
299 ];
300 let (count, name) = compute_message_count_and_name(&entries);
301 assert_eq!(count, 2);
302 assert_eq!(name, Some("My Session".to_string()));
303 }
304
305 #[test]
306 fn compute_counts_session_info_none_name_ignored() {
307 let entries = vec![
308 session_info_entry(Some("First".to_string())),
309 session_info_entry(None),
310 message_entry(),
311 ];
312 let (count, name) = compute_message_count_and_name(&entries);
313 assert_eq!(count, 1);
314 assert_eq!(name, Some("First".to_string()));
316 }
317
318 #[test]
319 fn compute_counts_latest_name_wins() {
320 let entries = vec![
321 session_info_entry(Some("First".to_string())),
322 session_info_entry(Some("Second".to_string())),
323 ];
324 let (_, name) = compute_message_count_and_name(&entries);
325 assert_eq!(name, Some("Second".to_string()));
326 }
327
328 #[test]
331 fn compute_counts_ignores_model_change_entries() {
332 use crate::session::ModelChangeEntry;
333 let entries = vec![
334 message_entry(),
335 SessionEntry::ModelChange(ModelChangeEntry {
336 base: dummy_base(),
337 provider: "anthropic".to_string(),
338 model_id: "claude-sonnet-4-5".to_string(),
339 }),
340 message_entry(),
341 ];
342 let (count, name) = compute_message_count_and_name(&entries);
343 assert_eq!(count, 2);
344 assert!(name.is_none());
345 }
346
347 #[test]
348 fn compute_counts_ignores_label_entries() {
349 use crate::session::LabelEntry;
350 let entries = vec![
351 message_entry(),
352 SessionEntry::Label(LabelEntry {
353 base: dummy_base(),
354 target_id: "some-id".to_string(),
355 label: Some("important".to_string()),
356 }),
357 ];
358 let (count, name) = compute_message_count_and_name(&entries);
359 assert_eq!(count, 1);
360 assert!(name.is_none());
361 }
362
363 #[test]
364 fn compute_counts_ignores_custom_entries() {
365 use crate::session::CustomEntry;
366 let entries = vec![
367 SessionEntry::Custom(CustomEntry {
368 base: dummy_base(),
369 custom_type: "my_custom".to_string(),
370 data: Some(serde_json::json!({"key": "value"})),
371 }),
372 message_entry(),
373 ];
374 let (count, name) = compute_message_count_and_name(&entries);
375 assert_eq!(count, 1);
376 assert!(name.is_none());
377 }
378
379 #[test]
380 fn compute_counts_ignores_compaction_entries() {
381 use crate::session::CompactionEntry;
382 let entries = vec![
383 message_entry(),
384 SessionEntry::Compaction(CompactionEntry {
385 base: dummy_base(),
386 summary: "summary text".to_string(),
387 first_kept_entry_id: "e1".to_string(),
388 tokens_before: 500,
389 details: None,
390 from_hook: None,
391 }),
392 message_entry(),
393 message_entry(),
394 ];
395 let (count, name) = compute_message_count_and_name(&entries);
396 assert_eq!(count, 3);
397 assert!(name.is_none());
398 }
399
400 #[test]
401 fn compute_counts_mixed_entry_types() {
402 use crate::session::{CompactionEntry, CustomEntry, LabelEntry, ModelChangeEntry};
403 let entries = vec![
404 message_entry(),
405 SessionEntry::ModelChange(ModelChangeEntry {
406 base: dummy_base(),
407 provider: "openai".to_string(),
408 model_id: "gpt-4".to_string(),
409 }),
410 session_info_entry(Some("Named".to_string())),
411 SessionEntry::Label(LabelEntry {
412 base: dummy_base(),
413 target_id: "t1".to_string(),
414 label: None,
415 }),
416 message_entry(),
417 SessionEntry::Compaction(CompactionEntry {
418 base: dummy_base(),
419 summary: "s".to_string(),
420 first_kept_entry_id: "e1".to_string(),
421 tokens_before: 100,
422 details: None,
423 from_hook: None,
424 }),
425 SessionEntry::Custom(CustomEntry {
426 base: dummy_base(),
427 custom_type: "ct".to_string(),
428 data: None,
429 }),
430 message_entry(),
431 ];
432 let (count, name) = compute_message_count_and_name(&entries);
433 assert_eq!(count, 3);
434 assert_eq!(name, Some("Named".to_string()));
435 }
436
437 #[test]
440 fn map_sqlite_result_ok() {
441 let result = map_sqlite_result::<i32>(Ok(42));
442 assert_eq!(result.unwrap(), 42);
443 }
444
445 #[test]
446 fn map_sqlite_result_err() {
447 let config = SqliteConfig::file("bad\0path").flags(OpenFlags::create_read_write());
448 let result = map_sqlite_result::<i32>(SqliteConnection::open(&config).map(|_| 42));
449 let err = result.unwrap_err();
450 match err {
451 Error::Session(message) => {
452 assert!(message.contains("SQLite session error"));
453 }
454 other => unreachable!("Unexpected error: {:?}", other),
455 }
456 }
457
458 #[test]
461 fn sqlite_session_meta_fields() {
462 let meta = SqliteSessionMeta {
463 header: SessionHeader {
464 id: "test-session".to_string(),
465 ..SessionHeader::default()
466 },
467 message_count: 42,
468 name: Some("My Session".to_string()),
469 };
470 assert_eq!(meta.header.id, "test-session");
471 assert_eq!(meta.message_count, 42);
472 assert_eq!(meta.name.as_deref(), Some("My Session"));
473 }
474
475 #[test]
476 fn sqlite_session_meta_no_name() {
477 let meta = SqliteSessionMeta {
478 header: SessionHeader::default(),
479 message_count: 0,
480 name: None,
481 };
482 assert_eq!(meta.message_count, 0);
483 assert!(meta.name.is_none());
484 }
485
486 #[test]
489 fn compute_counts_large_message_set() {
490 let entries: Vec<SessionEntry> = (0..1000).map(|_| message_entry()).collect();
491 let (count, name) = compute_message_count_and_name(&entries);
492 assert_eq!(count, 1000);
493 assert!(name.is_none());
494 }
495
496 #[test]
499 fn compute_counts_name_set_early_persists() {
500 let entries = vec![
501 session_info_entry(Some("Early Name".to_string())),
502 message_entry(),
503 message_entry(),
504 message_entry(),
505 ];
506 let (count, name) = compute_message_count_and_name(&entries);
507 assert_eq!(count, 3);
508 assert_eq!(name, Some("Early Name".to_string()));
509 }
510
511 #[test]
514 fn compute_counts_ignores_branch_summary() {
515 use crate::session::BranchSummaryEntry;
516 let entries = vec![
517 message_entry(),
518 SessionEntry::BranchSummary(BranchSummaryEntry {
519 base: dummy_base(),
520 from_id: "parent-id".to_string(),
521 summary: "branch summary".to_string(),
522 details: None,
523 from_hook: None,
524 }),
525 ];
526 let (count, name) = compute_message_count_and_name(&entries);
527 assert_eq!(count, 1);
528 assert!(name.is_none());
529 }
530
531 #[test]
534 fn compute_counts_ignores_thinking_level_change() {
535 use crate::session::ThinkingLevelChangeEntry;
536 let entries = vec![
537 SessionEntry::ThinkingLevelChange(ThinkingLevelChangeEntry {
538 base: dummy_base(),
539 thinking_level: "high".to_string(),
540 }),
541 message_entry(),
542 ];
543 let (count, name) = compute_message_count_and_name(&entries);
544 assert_eq!(count, 1);
545 assert!(name.is_none());
546 }
547
548 #[test]
549 fn save_session_rejects_semantically_invalid_header() {
550 let dir = tempfile::tempdir().expect("tempdir");
551 let path = dir.path().join("invalid.sqlite");
552 let header = SessionHeader {
553 r#type: "note".to_string(),
554 ..SessionHeader::default()
555 };
556
557 let err = futures::executor::block_on(async { save_session(&path, &header, &[]).await })
558 .expect_err("invalid header should fail");
559 let message = err.to_string();
560 assert!(
561 message.contains("Invalid session header"),
562 "expected invalid session header error, got {message}"
563 );
564 }
565
566 #[test]
567 fn load_session_meta_rejects_semantically_invalid_header() {
568 let dir = tempfile::tempdir().expect("tempdir");
569 let path = dir.path().join("invalid.sqlite");
570 let header = SessionHeader {
571 id: "sqlite-test".to_string(),
572 ..SessionHeader::default()
573 };
574
575 futures::executor::block_on(async { save_session(&path, &header, &[]).await })
576 .expect("save sqlite session");
577
578 let invalid_header = SessionHeader {
579 r#type: "note".to_string(),
580 ..header
581 };
582 let invalid_json =
583 serde_json::to_string(&invalid_header).expect("serialize invalid session header");
584 let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
585 .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
586 let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
587 conn.execute_sync(
588 "UPDATE pi_session_header SET json = ?1",
589 &[sqlmodel_core::Value::Text(invalid_json)],
590 )
591 .expect("corrupt sqlite header row");
592
593 let err = futures::executor::block_on(async { load_session_meta(&path).await })
594 .expect_err("invalid header should fail");
595 let message = err.to_string();
596 assert!(
597 message.contains("Invalid session header"),
598 "expected invalid session header error, got {message}"
599 );
600 }
601
602 #[test]
603 fn load_session_meta_falls_back_to_entries_when_name_row_missing() {
604 let dir = tempfile::tempdir().expect("tempdir");
605 let path = dir.path().join("missing-name-row.sqlite");
606 let header = SessionHeader {
607 id: "sqlite-name-fallback".to_string(),
608 ..SessionHeader::default()
609 };
610 let entries = vec![
611 session_info_entry(Some("Recovered Name".to_string())),
612 message_entry(),
613 message_entry(),
614 ];
615
616 futures::executor::block_on(async { save_session(&path, &header, &entries).await })
617 .expect("save sqlite session");
618
619 let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
620 .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
621 let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
622 conn.execute_sync(
623 "DELETE FROM pi_session_meta WHERE key = ?1",
624 &[SqliteValue::Text("name".to_string())],
625 )
626 .expect("delete name meta row");
627
628 let meta = futures::executor::block_on(async { load_session_meta(&path).await })
629 .expect("load sqlite meta");
630 assert_eq!(meta.message_count, 2);
631 assert_eq!(meta.name.as_deref(), Some("Recovered Name"));
632 }
633
634 #[test]
635 fn load_session_meta_falls_back_when_meta_table_missing() {
636 let dir = tempfile::tempdir().expect("tempdir");
637 let path = dir.path().join("missing-meta-table.sqlite");
638 let header = SessionHeader {
639 id: "sqlite-missing-meta".to_string(),
640 ..SessionHeader::default()
641 };
642 let entries = vec![
643 session_info_entry(Some("Recovered From Entries".to_string())),
644 message_entry(),
645 ];
646
647 futures::executor::block_on(async { save_session(&path, &header, &entries).await })
648 .expect("save sqlite session");
649
650 let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
651 .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
652 let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
653 conn.execute_raw("DROP TABLE pi_session_meta")
654 .expect("drop sqlite meta table");
655
656 let meta = futures::executor::block_on(async { load_session_meta(&path).await })
657 .expect("load sqlite meta");
658 assert_eq!(meta.message_count, 1);
659 assert_eq!(meta.name.as_deref(), Some("Recovered From Entries"));
660 }
661
662 #[test]
663 fn load_session_meta_rejects_invalid_meta_table_schema() {
664 let dir = tempfile::tempdir().expect("tempdir");
665 let path = dir.path().join("invalid-meta-schema.sqlite");
666 let header = SessionHeader {
667 id: "sqlite-invalid-meta-schema".to_string(),
668 ..SessionHeader::default()
669 };
670
671 futures::executor::block_on(async {
672 save_session(&path, &header, &[message_entry()]).await
673 })
674 .expect("save sqlite session");
675
676 let config = sqlmodel_sqlite::SqliteConfig::file(path.to_string_lossy())
677 .flags(sqlmodel_sqlite::OpenFlags::create_read_write());
678 let conn = sqlmodel_sqlite::SqliteConnection::open(&config).expect("open sqlite db");
679 conn.execute_raw("DROP TABLE pi_session_meta")
680 .expect("drop sqlite meta table");
681 conn.execute_raw("CREATE TABLE pi_session_meta (key TEXT PRIMARY KEY)")
682 .expect("create invalid sqlite meta table");
683
684 let err = futures::executor::block_on(async { load_session_meta(&path).await })
685 .expect_err("invalid meta schema should fail");
686 let message = err.to_string();
687 assert!(
688 message.contains("SQLite session meta query failed"),
689 "expected meta query error, got {message}"
690 );
691 }
692
693 #[cfg(unix)]
694 #[test]
695 fn load_paths_accept_read_only_sqlite_files() {
696 use std::os::unix::fs::PermissionsExt;
697
698 let dir = tempfile::tempdir().expect("tempdir");
699 let path = dir.path().join("readonly.sqlite");
700 let header = SessionHeader {
701 id: "sqlite-readonly".to_string(),
702 ..SessionHeader::default()
703 };
704 let entries = vec![
705 session_info_entry(Some("Read Only".to_string())),
706 message_entry(),
707 ];
708
709 futures::executor::block_on(async { save_session(&path, &header, &entries).await })
710 .expect("save sqlite session");
711
712 let original_mode = std::fs::metadata(&path)
713 .expect("sqlite metadata")
714 .permissions()
715 .mode();
716 std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o444))
717 .expect("chmod readonly sqlite");
718
719 let (loaded_header, loaded_entries) =
720 futures::executor::block_on(async { load_session(&path).await })
721 .expect("load readonly sqlite session");
722 let meta = futures::executor::block_on(async { load_session_meta(&path).await })
723 .expect("load readonly sqlite meta");
724
725 std::fs::set_permissions(&path, std::fs::Permissions::from_mode(original_mode))
726 .expect("restore sqlite permissions");
727
728 assert_eq!(loaded_header.id, header.id);
729 assert_eq!(loaded_entries.len(), entries.len());
730 assert_eq!(meta.header.id, header.id);
731 assert_eq!(meta.message_count, 1);
732 assert_eq!(meta.name.as_deref(), Some("Read Only"));
733 }
734
735 #[cfg(unix)]
736 #[test]
737 fn save_session_sets_private_permissions_for_sqlite_artifacts() {
738 use std::os::unix::fs::PermissionsExt;
739
740 let dir = tempfile::tempdir().expect("tempdir");
741 let path = dir.path().join("secure.sqlite");
742 let header = SessionHeader {
743 id: "sqlite-secure".to_string(),
744 ..SessionHeader::default()
745 };
746
747 futures::executor::block_on(async {
748 save_session(&path, &header, &[message_entry()]).await
749 })
750 .expect("save sqlite session");
751
752 for artifact in sqlite_artifact_paths(&path) {
753 if artifact.exists() {
754 let mode = std::fs::metadata(&artifact)
755 .expect("sqlite artifact metadata")
756 .permissions()
757 .mode()
758 & 0o777;
759 assert_eq!(
760 mode,
761 0o600,
762 "expected private permissions for {}",
763 artifact.display()
764 );
765 }
766 }
767 }
768}
769
770pub async fn save_session(
771 path: &Path,
772 header: &SessionHeader,
773 entries: &[SessionEntry],
774) -> Result<()> {
775 header
776 .validate()
777 .map_err(|reason| Error::session(format!("Invalid session header: {reason}")))?;
778 let metrics = session_metrics::global();
779 let _save_timer = metrics.start_timer(&metrics.sqlite_save);
780
781 if let Some(parent) = path.parent() {
782 asupersync::fs::create_dir_all(parent).await?;
783 }
784
785 let conn = open_sqlite_connection_read_write(path)?;
786 map_sqlite_result(conn.execute_raw(INIT_SQL))?;
787 ensure_private_sqlite_permissions(path)?;
788 map_sqlite_result(conn.execute_raw("BEGIN IMMEDIATE"))?;
789
790 let save_result = (|| -> Result<()> {
792 map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_entries", &[]))?;
793 map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_header", &[]))?;
794 map_sqlite_result(conn.execute_sync("DELETE FROM pi_session_meta", &[]))?;
795
796 let serialize_timer = metrics.start_timer(&metrics.sqlite_serialize);
797 let header_json = serde_json::to_string(header)?;
798 let mut total_json_bytes = header_json.len() as u64;
799
800 let mut entry_jsons = Vec::with_capacity(entries.len());
801 for entry in entries {
802 let json = serde_json::to_string(entry)?;
803 total_json_bytes += json.len() as u64;
804 entry_jsons.push(json);
805 }
806 serialize_timer.finish();
807 metrics.record_bytes(&metrics.sqlite_bytes, total_json_bytes);
808
809 map_sqlite_result(conn.execute_sync(
810 "INSERT INTO pi_session_header (id,json) VALUES (?1,?2)",
811 &[
812 SqliteValue::Text(header.id.clone()),
813 SqliteValue::Text(header_json),
814 ],
815 ))?;
816
817 let mut seq = 1_i64;
818 for chunk in entry_jsons.chunks(200) {
819 let mut sql = String::with_capacity(64 + chunk.len() * 16);
820 sql.push_str("INSERT INTO pi_session_entries (seq,json) VALUES ");
821 let mut params = Vec::with_capacity(chunk.len() * 2);
822 for (i, json) in chunk.iter().enumerate() {
823 if i > 0 {
824 sql.push(',');
825 }
826 let _ = write!(sql, "(?{},?{})", i * 2 + 1, i * 2 + 2);
827 params.push(SqliteValue::BigInt(seq));
828 params.push(SqliteValue::Text(json.clone()));
829 seq += 1;
830 }
831 map_sqlite_result(conn.execute_sync(&sql, ¶ms))?;
832 }
833
834 let (message_count, name) = compute_message_count_and_name(entries);
835 map_sqlite_result(conn.execute_sync(
836 "INSERT INTO pi_session_meta (key,value) VALUES (?1,?2)",
837 &[
838 SqliteValue::Text("message_count".to_string()),
839 SqliteValue::Text(message_count.to_string()),
840 ],
841 ))?;
842 let name_value = name.unwrap_or_default();
843 map_sqlite_result(conn.execute_sync(
844 "INSERT INTO pi_session_meta (key,value) VALUES (?1,?2)",
845 &[
846 SqliteValue::Text("name".to_string()),
847 SqliteValue::Text(name_value),
848 ],
849 ))?;
850
851 Ok(())
852 })();
853
854 match save_result {
855 Ok(()) => {
856 map_sqlite_result(conn.execute_raw("COMMIT"))?;
857 ensure_private_sqlite_permissions(path)?;
858 Ok(())
859 }
860 Err(err) => {
861 rollback_quietly(&conn);
862 Err(err)
863 }
864 }
865}
866
867pub async fn append_entries(
876 path: &Path,
877 new_entries: &[SessionEntry],
878 start_seq: usize,
879 message_count: u64,
880 session_name: Option<&str>,
881) -> Result<()> {
882 let metrics = session_metrics::global();
883 let _timer = metrics.start_timer(&metrics.sqlite_append);
884
885 let conn = open_sqlite_connection_read_write(path)?;
886
887 map_sqlite_result(conn.execute_raw(INIT_SQL))?;
889 ensure_private_sqlite_permissions(path)?;
890 map_sqlite_result(conn.execute_raw("BEGIN IMMEDIATE"))?;
891
892 let append_result = (|| -> Result<()> {
893 let serialize_timer = metrics.start_timer(&metrics.sqlite_serialize);
895 let mut total_json_bytes = 0u64;
896 let mut entry_jsons = Vec::with_capacity(new_entries.len());
897 for entry in new_entries {
898 let json = serde_json::to_string(entry)?;
899 total_json_bytes += json.len() as u64;
900 entry_jsons.push(json);
901 }
902 serialize_timer.finish();
903 metrics.record_bytes(&metrics.sqlite_bytes, total_json_bytes);
904
905 let mut seq = i64::try_from(start_seq)
906 .unwrap_or(i64::MAX.saturating_sub(1))
907 .saturating_add(1);
908 for chunk in entry_jsons.chunks(200) {
909 let mut sql = String::with_capacity(64 + chunk.len() * 16);
910 sql.push_str("INSERT INTO pi_session_entries (seq,json) VALUES ");
911 let mut params = Vec::with_capacity(chunk.len() * 2);
912 for (i, json) in chunk.iter().enumerate() {
913 if i > 0 {
914 sql.push(',');
915 }
916 let _ = write!(sql, "(?{},?{})", i * 2 + 1, i * 2 + 2);
917 params.push(SqliteValue::BigInt(seq));
918 params.push(SqliteValue::Text(json.clone()));
919 seq += 1;
920 }
921 map_sqlite_result(conn.execute_sync(&sql, ¶ms))?;
922 }
923
924 map_sqlite_result(conn.execute_sync(
926 "INSERT OR REPLACE INTO pi_session_meta (key,value) VALUES (?1,?2)",
927 &[
928 SqliteValue::Text("message_count".to_string()),
929 SqliteValue::Text(message_count.to_string()),
930 ],
931 ))?;
932 let name_value = session_name.unwrap_or("");
933 map_sqlite_result(conn.execute_sync(
934 "INSERT OR REPLACE INTO pi_session_meta (key,value) VALUES (?1,?2)",
935 &[
936 SqliteValue::Text("name".to_string()),
937 SqliteValue::Text(name_value.to_string()),
938 ],
939 ))?;
940
941 Ok(())
942 })();
943
944 match append_result {
945 Ok(()) => {
946 map_sqlite_result(conn.execute_raw("COMMIT"))?;
947 ensure_private_sqlite_permissions(path)?;
948 Ok(())
949 }
950 Err(err) => {
951 rollback_quietly(&conn);
952 Err(err)
953 }
954 }
955}