1use anyhow::{Context, Result, bail};
2use soma_studio_core::{
3 AppConfig, ConversationMessage, ConversationSummary, IngestJobSummary, IngestStatusResponse,
4 ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
5 WorkspaceFileChangeAuditEntry, WorkspaceFileChangeAuditStatus,
6 WorkspaceFileChangePreviewRequest, WorkspaceTaskId, WorkspaceTaskRunStatus,
7 WorkspaceTaskRunSummary,
8};
9use std::time::{SystemTime, UNIX_EPOCH};
10use turso::{Builder, Connection, Row};
11use uuid::Uuid;
12
13const DEFAULT_CONVERSATION_TITLE: &str = "New conversation";
14const CONVERSATION_TITLE_LIMIT: usize = 60;
15const WORKSPACE_HISTORY_RETAIN_PER_SESSION: usize = 200;
16pub const STORAGE_SCHEMA_VERSION: i64 = 3;
17
18#[derive(Debug, Clone)]
19pub struct StudioStorage {
20 conn: Connection,
21}
22
23#[derive(Debug, Clone)]
24pub struct NewConversationMessage {
25 pub conversation_id: String,
26 pub role: String,
27 pub content: String,
28 pub status: String,
29 pub provider: Option<String>,
30 pub model_id: Option<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct IndexedSourceFileRow {
35 pub source_root_id: String,
36 pub relative_path: String,
37 pub absolute_path: String,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct StorageSchemaStatus {
42 pub current_version: i64,
43 pub expected_version: i64,
44}
45
46impl StorageSchemaStatus {
47 pub fn is_current(&self) -> bool {
48 self.current_version == self.expected_version
49 }
50}
51
52impl StudioStorage {
53 pub async fn open(config: &AppConfig) -> Result<Self> {
54 let db_path = config
55 .db_path
56 .to_str()
57 .with_context(|| format!("invalid db path {}", config.db_path.display()))?;
58 let db = Builder::new_local(db_path).build().await.with_context(|| {
59 format!(
60 "failed to open local Turso db at {}",
61 config.db_path.display()
62 )
63 })?;
64 let conn = db
65 .connect()
66 .context("failed to connect to local Turso db")?;
67 let storage = Self { conn };
68 storage.reject_newer_schema_version().await?;
69 storage.initialize_schema().await?;
70 Ok(storage)
71 }
72
73 async fn initialize_schema(&self) -> Result<()> {
74 self.conn
75 .execute_batch(
76 r#"
77 CREATE TABLE IF NOT EXISTS source_roots (
78 path TEXT PRIMARY KEY,
79 id TEXT NOT NULL,
80 read_only INTEGER NOT NULL DEFAULT 1,
81 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
82 );
83
84 CREATE TABLE IF NOT EXISTS provider_status (
85 provider TEXT PRIMARY KEY,
86 last_test_ok INTEGER,
87 last_test_detail TEXT,
88 last_tested_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
89 );
90
91 CREATE TABLE IF NOT EXISTS provider_selection (
92 singleton INTEGER PRIMARY KEY CHECK (singleton = 1),
93 provider TEXT,
94 model_id TEXT,
95 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
96 );
97
98 CREATE TABLE IF NOT EXISTS sessions (
99 id TEXT PRIMARY KEY,
100 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
101 );
102
103 CREATE TABLE IF NOT EXISTS conversations (
104 id TEXT PRIMARY KEY,
105 session_id TEXT NOT NULL,
106 title TEXT NOT NULL,
107 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
108 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
109 last_message_at TEXT
110 );
111
112 CREATE TABLE IF NOT EXISTS messages (
113 id TEXT PRIMARY KEY,
114 conversation_id TEXT NOT NULL,
115 sequence INTEGER NOT NULL,
116 role TEXT NOT NULL,
117 content TEXT NOT NULL,
118 status TEXT NOT NULL,
119 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
120 provider TEXT,
121 model_id TEXT
122 );
123
124 CREATE INDEX IF NOT EXISTS idx_conversations_recent
125 ON conversations (session_id, last_message_at DESC, updated_at DESC, created_at DESC);
126
127 CREATE INDEX IF NOT EXISTS idx_messages_conversation_created
128 ON messages (conversation_id, sequence);
129
130 CREATE TABLE IF NOT EXISTS source_files (
131 source_root_id TEXT NOT NULL,
132 relative_path TEXT NOT NULL,
133 absolute_path TEXT NOT NULL,
134 fingerprint TEXT NOT NULL,
135 size_bytes INTEGER NOT NULL,
136 modified_at TEXT NOT NULL,
137 file_type TEXT NOT NULL,
138 status TEXT NOT NULL,
139 last_error TEXT,
140 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
141 PRIMARY KEY (source_root_id, relative_path)
142 );
143
144 CREATE TABLE IF NOT EXISTS ingest_jobs (
145 id TEXT PRIMARY KEY,
146 source_root_id TEXT,
147 scope_label TEXT NOT NULL,
148 status TEXT NOT NULL,
149 file_count INTEGER NOT NULL DEFAULT 0,
150 started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
151 completed_at TEXT,
152 last_error TEXT
153 );
154
155 CREATE INDEX IF NOT EXISTS idx_ingest_jobs_started_at
156 ON ingest_jobs (started_at DESC);
157
158 CREATE TABLE IF NOT EXISTS workspace_task_runs (
159 run_id TEXT PRIMARY KEY,
160 session_id TEXT NOT NULL,
161 task_id TEXT NOT NULL,
162 path TEXT NOT NULL,
163 status TEXT NOT NULL,
164 command_label TEXT NOT NULL,
165 exit_code INTEGER,
166 stdout_tail TEXT NOT NULL,
167 stderr_tail TEXT NOT NULL,
168 stdout_truncated INTEGER NOT NULL,
169 stderr_truncated INTEGER NOT NULL,
170 timed_out INTEGER NOT NULL,
171 cancel_requested INTEGER NOT NULL,
172 started_at_ms INTEGER NOT NULL,
173 completed_at_ms INTEGER,
174 duration_ms INTEGER,
175 error TEXT,
176 error_code TEXT,
177 max_output_bytes INTEGER NOT NULL,
178 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
179 );
180
181 CREATE INDEX IF NOT EXISTS idx_workspace_task_runs_recent
182 ON workspace_task_runs (session_id, started_at_ms DESC);
183
184 CREATE TABLE IF NOT EXISTS workspace_file_change_audits (
185 audit_id TEXT PRIMARY KEY,
186 session_id TEXT NOT NULL,
187 action TEXT NOT NULL,
188 path TEXT NOT NULL,
189 target_path TEXT,
190 status TEXT NOT NULL,
191 error TEXT,
192 error_code TEXT,
193 size_bytes_before INTEGER,
194 size_bytes_after INTEGER,
195 created_at_ms INTEGER NOT NULL,
196 applied_at_ms INTEGER,
197 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
198 );
199
200 CREATE INDEX IF NOT EXISTS idx_workspace_file_change_audits_recent
201 ON workspace_file_change_audits (session_id, created_at_ms DESC);
202 "#,
203 )
204 .await
205 .context("failed to initialize Turso schema")?;
206 self.run_schema_migrations().await?;
207 self.mark_interrupted_workspace_task_runs().await?;
208 self.mark_interrupted_workspace_file_change_audits().await?;
209 Ok(())
210 }
211
212 async fn run_schema_migrations(&self) -> Result<()> {
213 let current_version = self.schema_user_version().await?;
214 if current_version > STORAGE_SCHEMA_VERSION {
215 bail!(
216 "database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
217 );
218 }
219
220 if current_version < 2 {
221 self.ensure_workspace_task_runs_error_code_column().await?;
222 }
223 if current_version < 3 {
224 self.ensure_workspace_file_change_audits_error_code_column()
225 .await?;
226 }
227
228 if current_version != STORAGE_SCHEMA_VERSION {
229 self.set_schema_user_version(STORAGE_SCHEMA_VERSION).await?;
230 }
231 Ok(())
232 }
233
234 async fn reject_newer_schema_version(&self) -> Result<()> {
235 let current_version = self.schema_user_version().await?;
236 if current_version > STORAGE_SCHEMA_VERSION {
237 bail!(
238 "database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
239 );
240 }
241 Ok(())
242 }
243
244 async fn schema_user_version(&self) -> Result<i64> {
245 let mut rows = self
246 .conn
247 .query("PRAGMA user_version", ())
248 .await
249 .context("failed to inspect database schema version")?;
250 let Some(row) = rows
251 .next()
252 .await
253 .context("failed to read database schema version row")?
254 else {
255 bail!("database schema version pragma returned no rows");
256 };
257 row.get(0)
258 .context("failed to decode database schema version")
259 }
260
261 async fn set_schema_user_version(&self, version: i64) -> Result<()> {
262 self.conn
263 .execute(&format!("PRAGMA user_version = {version}"), ())
264 .await
265 .with_context(|| format!("failed to set database schema version to {version}"))?;
266 Ok(())
267 }
268
269 pub async fn schema_status(&self) -> Result<StorageSchemaStatus> {
270 Ok(StorageSchemaStatus {
271 current_version: self.schema_user_version().await?,
272 expected_version: STORAGE_SCHEMA_VERSION,
273 })
274 }
275
276 async fn ensure_workspace_task_runs_error_code_column(&self) -> Result<()> {
277 let mut rows = self
278 .conn
279 .query("PRAGMA table_info(workspace_task_runs)", ())
280 .await
281 .context("failed to inspect workspace task run schema")?;
282 while let Some(row) = rows
283 .next()
284 .await
285 .context("failed to read workspace task run schema row")?
286 {
287 let name: String = row
288 .get(1)
289 .context("failed to decode workspace task run column name")?;
290 if name == "error_code" {
291 return Ok(());
292 }
293 }
294 drop(rows);
295
296 self.conn
297 .execute(
298 "ALTER TABLE workspace_task_runs ADD COLUMN error_code TEXT",
299 (),
300 )
301 .await
302 .context("failed to add workspace task run error_code column")?;
303 Ok(())
304 }
305
306 async fn ensure_workspace_file_change_audits_error_code_column(&self) -> Result<()> {
307 let mut rows = self
308 .conn
309 .query("PRAGMA table_info(workspace_file_change_audits)", ())
310 .await
311 .context("failed to inspect workspace file change audit schema")?;
312 while let Some(row) = rows
313 .next()
314 .await
315 .context("failed to read workspace file change audit schema row")?
316 {
317 let name: String = row
318 .get(1)
319 .context("failed to decode workspace file change audit column name")?;
320 if name == "error_code" {
321 return Ok(());
322 }
323 }
324 drop(rows);
325
326 self.conn
327 .execute(
328 "ALTER TABLE workspace_file_change_audits ADD COLUMN error_code TEXT",
329 (),
330 )
331 .await
332 .context("failed to add workspace file change audit error_code column")?;
333 Ok(())
334 }
335
336 async fn mark_interrupted_workspace_task_runs(&self) -> Result<()> {
337 let now_ms = unix_epoch_ms();
338 self.conn
339 .execute(
340 r#"
341 UPDATE workspace_task_runs
342 SET status = 'failed',
343 completed_at_ms = COALESCE(completed_at_ms, ?1),
344 duration_ms = COALESCE(duration_ms, MAX(0, ?1 - started_at_ms)),
345 error = COALESCE(error, 'workspace task interrupted before completion'),
346 error_code = COALESCE(error_code, 'workspace_task_interrupted'),
347 updated_at = CURRENT_TIMESTAMP
348 WHERE status IN ('queued', 'running')
349 "#,
350 [u64_to_i64(now_ms, "workspace task interrupted_at_ms")?],
351 )
352 .await
353 .context("failed to mark interrupted workspace task runs")?;
354 Ok(())
355 }
356
357 async fn mark_interrupted_workspace_file_change_audits(&self) -> Result<()> {
358 let now_ms = unix_epoch_ms();
359 self.conn
360 .execute(
361 r#"
362 UPDATE workspace_file_change_audits
363 SET status = 'failed',
364 applied_at_ms = COALESCE(applied_at_ms, ?1),
365 error = COALESCE(error, 'workspace file change interrupted before completion'),
366 error_code = COALESCE(error_code, 'workspace_file_change_interrupted'),
367 updated_at = CURRENT_TIMESTAMP
368 WHERE status = 'applying'
369 "#,
370 [u64_to_i64(
371 now_ms,
372 "workspace file change interrupted_at_ms",
373 )?],
374 )
375 .await
376 .context("failed to mark interrupted workspace file change audits")?;
377 Ok(())
378 }
379
380 pub async fn list_source_roots(&self) -> Result<Vec<SourceRootSummary>> {
381 let mut rows = self
382 .conn
383 .query(
384 "SELECT id, path, read_only FROM source_roots ORDER BY path",
385 (),
386 )
387 .await
388 .context("failed to query source_roots")?;
389
390 let mut source_roots = Vec::new();
391 while let Some(row) = rows
392 .next()
393 .await
394 .context("failed to read source_root row")?
395 {
396 let id: String = row.get(0).context("failed to decode source_root id")?;
397 let path: String = row.get(1).context("failed to decode source_root path")?;
398 let read_only: i64 = row
399 .get(2)
400 .context("failed to decode source_root read_only")?;
401 source_roots.push(SourceRootSummary {
402 id: Uuid::parse_str(&id)
403 .with_context(|| format!("invalid source_root UUID {id}"))?,
404 path,
405 read_only: read_only != 0,
406 });
407 }
408
409 Ok(source_roots)
410 }
411
412 pub async fn upsert_source_root(
413 &self,
414 summary: &SourceRootSummary,
415 ) -> Result<SourceRootSummary> {
416 self.conn
417 .execute(
418 "INSERT OR IGNORE INTO source_roots (path, id, read_only) VALUES (?1, ?2, ?3)",
419 (
420 summary.path.clone(),
421 summary.id.to_string(),
422 if summary.read_only { 1_i64 } else { 0_i64 },
423 ),
424 )
425 .await
426 .with_context(|| format!("failed to persist source root {}", summary.path))?;
427
428 self.find_source_root(&summary.path)
429 .await?
430 .with_context(|| format!("persisted source root missing for {}", summary.path))
431 }
432
433 async fn find_source_root(&self, path: &str) -> Result<Option<SourceRootSummary>> {
434 let mut stmt = self
435 .conn
436 .prepare("SELECT id, path, read_only FROM source_roots WHERE path = ?1 LIMIT 1")
437 .await
438 .context("failed to prepare source_root lookup")?;
439 let mut rows = stmt
440 .query([path])
441 .await
442 .with_context(|| format!("failed to lookup source root {path}"))?;
443
444 let Some(row) = rows.next().await.context("failed to read lookup row")? else {
445 return Ok(None);
446 };
447
448 let id: String = row.get(0).context("failed to decode lookup id")?;
449 let stored_path: String = row.get(1).context("failed to decode lookup path")?;
450 let read_only: i64 = row.get(2).context("failed to decode lookup read_only")?;
451
452 Ok(Some(SourceRootSummary {
453 id: Uuid::parse_str(&id).with_context(|| format!("invalid persisted UUID {id}"))?,
454 path: stored_path,
455 read_only: read_only != 0,
456 }))
457 }
458
459 pub async fn record_provider_test(&self, provider: &str, ok: bool, detail: &str) -> Result<()> {
460 self.conn
461 .execute(
462 r#"
463 INSERT INTO provider_status (provider, last_test_ok, last_test_detail, last_tested_at)
464 VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP)
465 ON CONFLICT(provider) DO UPDATE SET
466 last_test_ok = excluded.last_test_ok,
467 last_test_detail = excluded.last_test_detail,
468 last_tested_at = CURRENT_TIMESTAMP
469 "#,
470 (provider.to_string(), if ok { 1_i64 } else { 0_i64 }, detail.to_string()),
471 )
472 .await
473 .with_context(|| format!("failed to persist provider status for {provider}"))?;
474 Ok(())
475 }
476
477 pub async fn list_provider_statuses(&self) -> Result<Vec<ProviderStatusRow>> {
478 let mut rows = self
479 .conn
480 .query(
481 "SELECT provider, last_test_ok, last_test_detail, last_tested_at FROM provider_status",
482 (),
483 )
484 .await
485 .context("failed to query provider_status")?;
486
487 let mut statuses = Vec::new();
488 while let Some(row) = rows
489 .next()
490 .await
491 .context("failed to read provider_status row")?
492 {
493 let provider: String = row.get(0).context("failed to decode provider name")?;
494 let last_test_ok: Option<i64> = row.get(1).ok();
495 let last_test_detail: Option<String> = row.get(2).ok();
496 let last_tested_at: String =
497 row.get(3).context("failed to decode provider timestamp")?;
498 statuses.push(ProviderStatusRow {
499 provider,
500 last_test_ok: last_test_ok.map(|value| value != 0),
501 last_test_detail,
502 last_tested_at: Some(last_tested_at),
503 });
504 }
505
506 Ok(statuses)
507 }
508
509 pub async fn load_provider_selection(&self) -> Result<ProviderSelectionResponse> {
510 let mut rows = self
511 .conn
512 .query(
513 "SELECT provider, model_id FROM provider_selection WHERE singleton = 1",
514 (),
515 )
516 .await
517 .context("failed to query provider_selection")?;
518
519 let Some(row) = rows
520 .next()
521 .await
522 .context("failed to read provider_selection row")?
523 else {
524 return Ok(ProviderSelectionResponse {
525 selected_provider: None,
526 selected_model_id: None,
527 });
528 };
529
530 let selected_provider: Option<String> = row.get(0).ok();
531 let selected_model_id: Option<String> = row.get(1).ok();
532 Ok(ProviderSelectionResponse {
533 selected_provider,
534 selected_model_id,
535 })
536 }
537
538 pub async fn save_provider_selection(
539 &self,
540 selection: &ProviderSelectionResponse,
541 ) -> Result<ProviderSelectionResponse> {
542 self.conn
543 .execute(
544 r#"
545 INSERT INTO provider_selection (singleton, provider, model_id, updated_at)
546 VALUES (1, ?1, ?2, CURRENT_TIMESTAMP)
547 ON CONFLICT(singleton) DO UPDATE SET
548 provider = excluded.provider,
549 model_id = excluded.model_id,
550 updated_at = CURRENT_TIMESTAMP
551 "#,
552 (
553 selection.selected_provider.clone(),
554 selection.selected_model_id.clone(),
555 ),
556 )
557 .await
558 .context("failed to persist provider selection")?;
559
560 self.load_provider_selection().await
561 }
562
563 pub async fn persist_session(&self, session_id: &str) -> Result<()> {
564 self.conn
565 .execute(
566 "INSERT OR IGNORE INTO sessions (id) VALUES (?1)",
567 [session_id],
568 )
569 .await
570 .with_context(|| format!("failed to persist session {session_id}"))?;
571 Ok(())
572 }
573
574 pub async fn session_exists(&self, session_id: &str) -> Result<bool> {
575 let mut stmt = self
576 .conn
577 .prepare("SELECT id FROM sessions WHERE id = ?1 LIMIT 1")
578 .await
579 .context("failed to prepare session lookup")?;
580 let mut rows = stmt
581 .query([session_id])
582 .await
583 .with_context(|| format!("failed to lookup session {session_id}"))?;
584
585 Ok(rows
586 .next()
587 .await
588 .context("failed to read session row")?
589 .is_some())
590 }
591
592 pub async fn upsert_workspace_task_run(
593 &self,
594 session_id: &str,
595 summary: &WorkspaceTaskRunSummary,
596 ) -> Result<()> {
597 self.conn
598 .execute(
599 r#"
600 INSERT INTO workspace_task_runs (
601 run_id, session_id, task_id, path, status, command_label, exit_code,
602 stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
603 cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
604 error_code, max_output_bytes, updated_at
605 )
606 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, CURRENT_TIMESTAMP)
607 ON CONFLICT(run_id) DO UPDATE SET
608 task_id = excluded.task_id,
609 path = excluded.path,
610 status = excluded.status,
611 command_label = excluded.command_label,
612 exit_code = excluded.exit_code,
613 stdout_tail = excluded.stdout_tail,
614 stderr_tail = excluded.stderr_tail,
615 stdout_truncated = excluded.stdout_truncated,
616 stderr_truncated = excluded.stderr_truncated,
617 timed_out = excluded.timed_out,
618 cancel_requested = excluded.cancel_requested,
619 started_at_ms = excluded.started_at_ms,
620 completed_at_ms = excluded.completed_at_ms,
621 duration_ms = excluded.duration_ms,
622 error = excluded.error,
623 error_code = excluded.error_code,
624 max_output_bytes = excluded.max_output_bytes,
625 updated_at = CURRENT_TIMESTAMP
626 "#,
627 turso::params![
628 summary.run_id.to_string(),
629 session_id.to_string(),
630 workspace_task_id_label(summary.task_id).to_string(),
631 summary.path.clone(),
632 workspace_task_run_status_label(summary.status).to_string(),
633 summary.command_label.clone(),
634 summary.exit_code.map(i64::from),
635 summary.stdout_tail.clone(),
636 summary.stderr_tail.clone(),
637 bool_to_i64(summary.stdout_truncated),
638 bool_to_i64(summary.stderr_truncated),
639 bool_to_i64(summary.timed_out),
640 bool_to_i64(summary.cancel_requested),
641 u64_to_i64(summary.started_at_ms, "workspace task started_at_ms")?,
642 summary
643 .completed_at_ms
644 .map(|value| u64_to_i64(value, "workspace task completed_at_ms"))
645 .transpose()?,
646 summary
647 .duration_ms
648 .map(|value| u64_to_i64(value, "workspace task duration_ms"))
649 .transpose()?,
650 summary.error.clone(),
651 summary.error_code.clone(),
652 usize_to_i64(summary.max_output_bytes, "workspace task max_output_bytes")?,
653 ],
654 )
655 .await
656 .with_context(|| format!("failed to persist workspace task run {}", summary.run_id))?;
657 self.prune_workspace_task_run_history(session_id, WORKSPACE_HISTORY_RETAIN_PER_SESSION)
658 .await?;
659 Ok(())
660 }
661
662 pub async fn list_workspace_task_runs(
663 &self,
664 session_id: &str,
665 limit: usize,
666 error_code: Option<&str>,
667 ) -> Result<Vec<WorkspaceTaskRunSummary>> {
668 let limit = usize_to_i64(limit.clamp(1, 200), "workspace task run list limit")?;
669 let error_code = error_code.map(str::to_string);
670 let mut rows = self
671 .conn
672 .query(
673 r#"
674 SELECT run_id, task_id, path, status, command_label, exit_code,
675 stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
676 cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
677 error_code, max_output_bytes
678 FROM workspace_task_runs
679 WHERE session_id = ?1
680 AND (?2 IS NULL OR error_code = ?2)
681 ORDER BY started_at_ms DESC, run_id DESC
682 LIMIT ?3
683 "#,
684 (session_id.to_string(), error_code, limit),
685 )
686 .await
687 .context("failed to query workspace task runs")?;
688
689 let mut summaries = Vec::new();
690 while let Some(row) = rows
691 .next()
692 .await
693 .context("failed to read workspace task run row")?
694 {
695 summaries.push(decode_workspace_task_run_summary(&row)?);
696 }
697 Ok(summaries)
698 }
699
700 pub async fn get_workspace_task_run(
701 &self,
702 session_id: &str,
703 run_id: Uuid,
704 ) -> Result<Option<WorkspaceTaskRunSummary>> {
705 let mut stmt = self
706 .conn
707 .prepare(
708 r#"
709 SELECT run_id, task_id, path, status, command_label, exit_code,
710 stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
711 cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
712 error_code, max_output_bytes
713 FROM workspace_task_runs
714 WHERE session_id = ?1 AND run_id = ?2
715 LIMIT 1
716 "#,
717 )
718 .await
719 .context("failed to prepare workspace task run lookup")?;
720 let mut rows = stmt
721 .query((session_id.to_string(), run_id.to_string()))
722 .await
723 .with_context(|| format!("failed to lookup workspace task run {run_id}"))?;
724
725 let Some(row) = rows
726 .next()
727 .await
728 .context("failed to read workspace task run lookup row")?
729 else {
730 return Ok(None);
731 };
732 Ok(Some(decode_workspace_task_run_summary(&row)?))
733 }
734
735 pub async fn start_workspace_file_change_audit(
736 &self,
737 session_id: &str,
738 request: &WorkspaceFileChangePreviewRequest,
739 size_bytes_before: Option<u64>,
740 ) -> Result<WorkspaceFileChangeAuditEntry> {
741 let audit_id = Uuid::new_v4();
742 let created_at_ms = unix_epoch_ms();
743 self.conn
744 .execute(
745 r#"
746 INSERT INTO workspace_file_change_audits (
747 audit_id, session_id, action, path, target_path, status, error,
748 error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms, updated_at
749 )
750 VALUES (?1, ?2, ?3, ?4, ?5, 'applying', NULL, NULL, ?6, NULL, ?7, NULL, CURRENT_TIMESTAMP)
751 "#,
752 turso::params![
753 audit_id.to_string(),
754 session_id.to_string(),
755 workspace_file_change_action_label(request.action).to_string(),
756 request.path.clone(),
757 request.target_path.clone(),
758 size_bytes_before
759 .map(|value| u64_to_i64(value, "workspace file change size_bytes_before"))
760 .transpose()?,
761 u64_to_i64(created_at_ms, "workspace file change created_at_ms")?,
762 ],
763 )
764 .await
765 .with_context(|| {
766 format!(
767 "failed to start workspace file change audit for {}",
768 request.path
769 )
770 })?;
771 self.prune_workspace_file_change_audit_history(
772 session_id,
773 WORKSPACE_HISTORY_RETAIN_PER_SESSION,
774 )
775 .await?;
776
777 self.find_workspace_file_change_audit(session_id, audit_id)
778 .await?
779 .with_context(|| format!("created workspace file change audit missing for {audit_id}"))
780 }
781
782 pub async fn finish_workspace_file_change_audit(
783 &self,
784 session_id: &str,
785 audit_id: Uuid,
786 status: WorkspaceFileChangeAuditStatus,
787 error: Option<&str>,
788 error_code: Option<&str>,
789 size_bytes_after: Option<u64>,
790 ) -> Result<WorkspaceFileChangeAuditEntry> {
791 if matches!(status, WorkspaceFileChangeAuditStatus::Applying) {
792 anyhow::bail!("workspace file change audit finish status must be terminal");
793 }
794 let applied_at_ms = unix_epoch_ms();
795 self.conn
796 .execute(
797 r#"
798 UPDATE workspace_file_change_audits
799 SET status = ?3,
800 error = ?4,
801 error_code = ?5,
802 size_bytes_after = ?6,
803 applied_at_ms = ?7,
804 updated_at = CURRENT_TIMESTAMP
805 WHERE session_id = ?1 AND audit_id = ?2
806 "#,
807 turso::params![
808 session_id.to_string(),
809 audit_id.to_string(),
810 workspace_file_change_audit_status_label(status).to_string(),
811 error.map(str::to_string),
812 error_code.map(str::to_string),
813 size_bytes_after
814 .map(|value| u64_to_i64(value, "workspace file change size_bytes_after"))
815 .transpose()?,
816 u64_to_i64(applied_at_ms, "workspace file change applied_at_ms")?,
817 ],
818 )
819 .await
820 .with_context(|| format!("failed to finish workspace file change audit {audit_id}"))?;
821
822 self.find_workspace_file_change_audit(session_id, audit_id)
823 .await?
824 .with_context(|| format!("finished workspace file change audit missing for {audit_id}"))
825 }
826
827 pub async fn list_workspace_file_change_audits(
828 &self,
829 session_id: &str,
830 limit: usize,
831 error_code: Option<&str>,
832 ) -> Result<Vec<WorkspaceFileChangeAuditEntry>> {
833 let limit = usize_to_i64(
834 limit.clamp(1, 200),
835 "workspace file change audit list limit",
836 )?;
837 let error_code = error_code.map(str::to_string);
838 let mut rows = self
839 .conn
840 .query(
841 r#"
842 SELECT audit_id, action, path, target_path, status, error,
843 error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
844 FROM workspace_file_change_audits
845 WHERE session_id = ?1
846 AND (?2 IS NULL OR error_code = ?2)
847 ORDER BY created_at_ms DESC, audit_id DESC
848 LIMIT ?3
849 "#,
850 (session_id.to_string(), error_code, limit),
851 )
852 .await
853 .context("failed to query workspace file change audits")?;
854
855 let mut entries = Vec::new();
856 while let Some(row) = rows
857 .next()
858 .await
859 .context("failed to read workspace file change audit row")?
860 {
861 entries.push(decode_workspace_file_change_audit_entry(&row)?);
862 }
863 Ok(entries)
864 }
865
866 async fn find_workspace_file_change_audit(
867 &self,
868 session_id: &str,
869 audit_id: Uuid,
870 ) -> Result<Option<WorkspaceFileChangeAuditEntry>> {
871 let mut stmt = self
872 .conn
873 .prepare(
874 r#"
875 SELECT audit_id, action, path, target_path, status, error,
876 error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
877 FROM workspace_file_change_audits
878 WHERE session_id = ?1 AND audit_id = ?2
879 LIMIT 1
880 "#,
881 )
882 .await
883 .context("failed to prepare workspace file change audit lookup")?;
884 let mut rows = stmt
885 .query((session_id.to_string(), audit_id.to_string()))
886 .await
887 .with_context(|| format!("failed to lookup workspace file change audit {audit_id}"))?;
888
889 let Some(row) = rows
890 .next()
891 .await
892 .context("failed to read workspace file change audit lookup row")?
893 else {
894 return Ok(None);
895 };
896 Ok(Some(decode_workspace_file_change_audit_entry(&row)?))
897 }
898
899 async fn prune_workspace_task_run_history(
900 &self,
901 session_id: &str,
902 retain_limit: usize,
903 ) -> Result<()> {
904 let retain_limit = usize_to_i64(retain_limit.max(1), "workspace task retention limit")?;
905 self.conn
906 .execute(
907 r#"
908 DELETE FROM workspace_task_runs
909 WHERE session_id = ?1
910 AND run_id NOT IN (
911 SELECT run_id
912 FROM workspace_task_runs
913 WHERE session_id = ?1
914 ORDER BY started_at_ms DESC, run_id DESC
915 LIMIT ?2
916 )
917 "#,
918 (session_id.to_string(), retain_limit),
919 )
920 .await
921 .with_context(|| format!("failed to prune workspace task history for {session_id}"))?;
922 Ok(())
923 }
924
925 async fn prune_workspace_file_change_audit_history(
926 &self,
927 session_id: &str,
928 retain_limit: usize,
929 ) -> Result<()> {
930 let retain_limit =
931 usize_to_i64(retain_limit.max(1), "workspace file audit retention limit")?;
932 self.conn
933 .execute(
934 r#"
935 DELETE FROM workspace_file_change_audits
936 WHERE session_id = ?1
937 AND audit_id NOT IN (
938 SELECT audit_id
939 FROM workspace_file_change_audits
940 WHERE session_id = ?1
941 ORDER BY created_at_ms DESC, audit_id DESC
942 LIMIT ?2
943 )
944 "#,
945 (session_id.to_string(), retain_limit),
946 )
947 .await
948 .with_context(|| {
949 format!("failed to prune workspace file change audit history for {session_id}")
950 })?;
951 Ok(())
952 }
953
954 pub async fn create_conversation(
955 &self,
956 session_id: &str,
957 title: Option<&str>,
958 ) -> Result<ConversationSummary> {
959 let conversation_id = Uuid::new_v4().to_string();
960 self.insert_conversation(session_id, &conversation_id, title)
961 .await?;
962 self.find_conversation(session_id, &conversation_id)
963 .await?
964 .with_context(|| format!("persisted conversation missing for {conversation_id}"))
965 }
966
967 pub async fn ensure_conversation(
968 &self,
969 session_id: &str,
970 conversation_id: &str,
971 title_hint: Option<&str>,
972 ) -> Result<ConversationSummary> {
973 self.insert_conversation(session_id, conversation_id, title_hint)
974 .await?;
975 self.find_conversation(session_id, conversation_id)
976 .await?
977 .with_context(|| format!("persisted conversation missing for {conversation_id}"))
978 }
979
980 pub async fn list_conversations(&self, session_id: &str) -> Result<Vec<ConversationSummary>> {
981 let mut rows = self
982 .conn
983 .query(
984 r#"
985 SELECT id, title, created_at, updated_at, last_message_at
986 FROM conversations
987 WHERE session_id = ?1
988 ORDER BY COALESCE(last_message_at, updated_at, created_at) DESC, id DESC
989 "#,
990 [session_id],
991 )
992 .await
993 .context("failed to query conversations")?;
994
995 let mut conversations = Vec::new();
996 while let Some(row) = rows
997 .next()
998 .await
999 .context("failed to read conversation row")?
1000 {
1001 conversations.push(decode_conversation_row(&row)?);
1002 }
1003
1004 Ok(conversations)
1005 }
1006
1007 pub async fn find_conversation(
1008 &self,
1009 session_id: &str,
1010 conversation_id: &str,
1011 ) -> Result<Option<ConversationSummary>> {
1012 let mut stmt = self
1013 .conn
1014 .prepare(
1015 r#"
1016 SELECT id, title, created_at, updated_at, last_message_at
1017 FROM conversations
1018 WHERE session_id = ?1 AND id = ?2
1019 LIMIT 1
1020 "#,
1021 )
1022 .await
1023 .context("failed to prepare conversation lookup")?;
1024 let mut rows = stmt
1025 .query((session_id.to_string(), conversation_id.to_string()))
1026 .await
1027 .with_context(|| format!("failed to lookup conversation {conversation_id}"))?;
1028
1029 let Some(row) = rows
1030 .next()
1031 .await
1032 .context("failed to read conversation lookup row")?
1033 else {
1034 return Ok(None);
1035 };
1036
1037 Ok(Some(decode_conversation_row(&row)?))
1038 }
1039
1040 pub async fn delete_conversation(
1041 &self,
1042 session_id: &str,
1043 conversation_id: &str,
1044 ) -> Result<bool> {
1045 if self
1046 .find_conversation(session_id, conversation_id)
1047 .await?
1048 .is_none()
1049 {
1050 return Ok(false);
1051 }
1052
1053 self.conn
1054 .execute(
1055 "DELETE FROM messages WHERE conversation_id = ?1",
1056 [conversation_id],
1057 )
1058 .await
1059 .with_context(|| {
1060 format!("failed to delete messages for conversation {conversation_id}")
1061 })?;
1062
1063 let deleted = self
1064 .conn
1065 .execute("DELETE FROM conversations WHERE id = ?1", [conversation_id])
1066 .await
1067 .with_context(|| format!("failed to delete conversation {conversation_id}"))?;
1068
1069 Ok(deleted > 0)
1070 }
1071
1072 pub async fn list_conversation_messages(
1073 &self,
1074 session_id: &str,
1075 conversation_id: &str,
1076 ) -> Result<Vec<ConversationMessage>> {
1077 if self
1078 .find_conversation(session_id, conversation_id)
1079 .await?
1080 .is_none()
1081 {
1082 return Ok(Vec::new());
1083 }
1084
1085 let mut stmt = self
1086 .conn
1087 .prepare(
1088 r#"
1089 SELECT id, conversation_id, role, content, status, created_at, provider, model_id
1090 FROM messages
1091 WHERE conversation_id = ?1
1092 ORDER BY sequence ASC, id ASC
1093 "#,
1094 )
1095 .await
1096 .context("failed to prepare message list query")?;
1097 let mut rows = stmt.query([conversation_id]).await.with_context(|| {
1098 format!("failed to list messages for conversation {conversation_id}")
1099 })?;
1100
1101 let mut messages = Vec::new();
1102 while let Some(row) = rows.next().await.context("failed to read message row")? {
1103 messages.push(decode_message_row(&row)?);
1104 }
1105
1106 Ok(messages)
1107 }
1108
1109 pub async fn create_message(
1110 &self,
1111 message: &NewConversationMessage,
1112 ) -> Result<ConversationMessage> {
1113 let message_id = Uuid::new_v4().to_string();
1114 let sequence = self.next_message_sequence(&message.conversation_id).await?;
1115 self.conn
1116 .execute(
1117 r#"
1118 INSERT INTO messages (id, conversation_id, sequence, role, content, status, provider, model_id)
1119 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1120 "#,
1121 (
1122 message_id.clone(),
1123 message.conversation_id.clone(),
1124 sequence,
1125 message.role.clone(),
1126 message.content.clone(),
1127 message.status.clone(),
1128 message.provider.clone(),
1129 message.model_id.clone(),
1130 ),
1131 )
1132 .await
1133 .with_context(|| format!("failed to persist message for conversation {}", message.conversation_id))?;
1134
1135 self.touch_conversation_activity(
1136 &message.conversation_id,
1137 message.role.eq("user").then_some(message.content.as_str()),
1138 )
1139 .await?;
1140
1141 self.find_message(&message_id)
1142 .await?
1143 .with_context(|| format!("persisted message missing for {message_id}"))
1144 }
1145
1146 pub async fn update_message_content(
1147 &self,
1148 message_id: &str,
1149 content: &str,
1150 status: &str,
1151 ) -> Result<ConversationMessage> {
1152 let conversation_id = self
1153 .message_conversation_id(message_id)
1154 .await?
1155 .with_context(|| format!("message not found for update: {message_id}"))?;
1156
1157 self.conn
1158 .execute(
1159 "UPDATE messages SET content = ?2, status = ?3 WHERE id = ?1",
1160 (
1161 message_id.to_string(),
1162 content.to_string(),
1163 status.to_string(),
1164 ),
1165 )
1166 .await
1167 .with_context(|| format!("failed to update message {message_id}"))?;
1168
1169 self.touch_conversation_activity(&conversation_id, None)
1170 .await?;
1171
1172 self.find_message(message_id)
1173 .await?
1174 .with_context(|| format!("updated message missing for {message_id}"))
1175 }
1176
1177 pub async fn append_message_delta(
1178 &self,
1179 message_id: &str,
1180 delta: &str,
1181 ) -> Result<ConversationMessage> {
1182 let conversation_id = self
1183 .message_conversation_id(message_id)
1184 .await?
1185 .with_context(|| format!("message not found for delta append: {message_id}"))?;
1186
1187 self.conn
1188 .execute(
1189 "UPDATE messages SET content = content || ?2, status = 'streaming' WHERE id = ?1",
1190 (message_id.to_string(), delta.to_string()),
1191 )
1192 .await
1193 .with_context(|| format!("failed to append delta to message {message_id}"))?;
1194
1195 self.touch_conversation_activity(&conversation_id, None)
1196 .await?;
1197
1198 self.find_message(message_id)
1199 .await?
1200 .with_context(|| format!("delta-updated message missing for {message_id}"))
1201 }
1202
1203 pub async fn replace_source_files(
1204 &self,
1205 source_root_id: &str,
1206 files: &[crate::ingest::ScannedSourceFile],
1207 ) -> Result<()> {
1208 self.conn
1209 .execute(
1210 "DELETE FROM source_files WHERE source_root_id = ?1",
1211 [source_root_id],
1212 )
1213 .await
1214 .with_context(|| format!("failed to clear source files for {source_root_id}"))?;
1215
1216 for file in files {
1217 self.conn
1218 .execute(
1219 r#"
1220 INSERT INTO source_files (
1221 source_root_id, relative_path, absolute_path, fingerprint,
1222 size_bytes, modified_at, file_type, status, last_error, updated_at
1223 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, CURRENT_TIMESTAMP)
1224 "#,
1225 (
1226 file.source_root_id.clone(),
1227 file.relative_path.clone(),
1228 file.absolute_path.clone(),
1229 file.fingerprint.clone(),
1230 file.size_bytes,
1231 file.modified_at.clone(),
1232 file.file_type.clone(),
1233 file.status.clone(),
1234 file.last_error.clone(),
1235 ),
1236 )
1237 .await
1238 .with_context(|| format!("failed to persist source file {}", file.relative_path))?;
1239 }
1240
1241 Ok(())
1242 }
1243
1244 pub async fn create_ingest_job(
1245 &self,
1246 source_root_id: Option<&str>,
1247 scope_label: &str,
1248 ) -> Result<IngestJobSummary> {
1249 let job_id = Uuid::new_v4().to_string();
1250 self.conn
1251 .execute(
1252 r#"
1253 INSERT INTO ingest_jobs (id, source_root_id, scope_label, status, file_count, started_at)
1254 VALUES (?1, ?2, ?3, 'running', 0, CURRENT_TIMESTAMP)
1255 "#,
1256 (
1257 job_id.clone(),
1258 source_root_id.map(str::to_string),
1259 scope_label.to_string(),
1260 ),
1261 )
1262 .await
1263 .with_context(|| format!("failed to create ingest job for {scope_label}"))?;
1264 self.find_ingest_job(&job_id)
1265 .await?
1266 .with_context(|| format!("created ingest job missing for {job_id}"))
1267 }
1268
1269 pub async fn complete_ingest_job(
1270 &self,
1271 job_id: &str,
1272 status: &str,
1273 file_count: usize,
1274 last_error: Option<&str>,
1275 ) -> Result<IngestJobSummary> {
1276 self.conn
1277 .execute(
1278 r#"
1279 UPDATE ingest_jobs
1280 SET status = ?2, file_count = ?3, completed_at = CURRENT_TIMESTAMP, last_error = ?4
1281 WHERE id = ?1
1282 "#,
1283 (
1284 job_id.to_string(),
1285 status.to_string(),
1286 file_count as i64,
1287 last_error.map(str::to_string),
1288 ),
1289 )
1290 .await
1291 .with_context(|| format!("failed to complete ingest job {job_id}"))?;
1292 self.find_ingest_job(job_id)
1293 .await?
1294 .with_context(|| format!("completed ingest job missing for {job_id}"))
1295 }
1296
1297 pub async fn list_ingest_jobs(&self) -> Result<Vec<IngestJobSummary>> {
1298 let mut rows = self
1299 .conn
1300 .query(
1301 "SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs ORDER BY started_at DESC LIMIT 20",
1302 (),
1303 )
1304 .await
1305 .context("failed to query ingest jobs")?;
1306
1307 let mut jobs = Vec::new();
1308 while let Some(row) = rows.next().await.context("failed to read ingest job row")? {
1309 jobs.push(decode_ingest_job_row(&row)?);
1310 }
1311
1312 Ok(jobs)
1313 }
1314
1315 pub async fn load_ingest_status(&self) -> Result<IngestStatusResponse> {
1316 let jobs = self.list_ingest_jobs().await?;
1317 let running = jobs.iter().any(|job| job.status == "running");
1318 let total_source_files = self.count_source_files().await?;
1319 let indexed_text_files = self.count_indexed_text_files().await?;
1320 Ok(IngestStatusResponse {
1321 running,
1322 total_source_files,
1323 indexed_text_files,
1324 jobs,
1325 })
1326 }
1327
1328 async fn find_ingest_job(&self, job_id: &str) -> Result<Option<IngestJobSummary>> {
1329 let mut stmt = self
1330 .conn
1331 .prepare(
1332 "SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs WHERE id = ?1 LIMIT 1",
1333 )
1334 .await
1335 .context("failed to prepare ingest job lookup")?;
1336 let mut rows = stmt
1337 .query([job_id])
1338 .await
1339 .with_context(|| format!("failed to lookup ingest job {job_id}"))?;
1340
1341 let Some(row) = rows
1342 .next()
1343 .await
1344 .context("failed to read ingest job lookup row")?
1345 else {
1346 return Ok(None);
1347 };
1348
1349 Ok(Some(decode_ingest_job_row(&row)?))
1350 }
1351
1352 async fn count_source_files(&self) -> Result<usize> {
1353 let mut rows = self
1354 .conn
1355 .query("SELECT COUNT(*) FROM source_files", ())
1356 .await
1357 .context("failed to count source files")?;
1358 let Some(row) = rows
1359 .next()
1360 .await
1361 .context("failed to read source file count")?
1362 else {
1363 return Ok(0);
1364 };
1365 let count: i64 = row.get(0).context("failed to decode source file count")?;
1366 Ok(count as usize)
1367 }
1368
1369 async fn count_indexed_text_files(&self) -> Result<usize> {
1370 let mut rows = self
1371 .conn
1372 .query(
1373 "SELECT COUNT(*) FROM source_files WHERE status = 'indexed'",
1374 (),
1375 )
1376 .await
1377 .context("failed to count indexed text files")?;
1378 let Some(row) = rows
1379 .next()
1380 .await
1381 .context("failed to read indexed text file count")?
1382 else {
1383 return Ok(0);
1384 };
1385 let count: i64 = row
1386 .get(0)
1387 .context("failed to decode indexed text file count")?;
1388 Ok(count as usize)
1389 }
1390
1391 pub async fn list_indexed_source_files(&self) -> Result<Vec<IndexedSourceFileRow>> {
1392 let mut rows = self
1393 .conn
1394 .query(
1395 "SELECT source_root_id, relative_path, absolute_path FROM source_files WHERE status = 'indexed' ORDER BY source_root_id, relative_path",
1396 (),
1397 )
1398 .await
1399 .context("failed to query indexed source files")?;
1400
1401 let mut files = Vec::new();
1402 while let Some(row) = rows
1403 .next()
1404 .await
1405 .context("failed to read indexed source file row")?
1406 {
1407 files.push(IndexedSourceFileRow {
1408 source_root_id: row
1409 .get(0)
1410 .context("failed to decode indexed source root id")?,
1411 relative_path: row
1412 .get(1)
1413 .context("failed to decode indexed source relative path")?,
1414 absolute_path: row
1415 .get(2)
1416 .context("failed to decode indexed source absolute path")?,
1417 });
1418 }
1419
1420 Ok(files)
1421 }
1422
1423 async fn insert_conversation(
1424 &self,
1425 session_id: &str,
1426 conversation_id: &str,
1427 title: Option<&str>,
1428 ) -> Result<()> {
1429 let normalized_title = normalize_conversation_title(title);
1430 self.conn
1431 .execute(
1432 r#"
1433 INSERT OR IGNORE INTO conversations (id, session_id, title, created_at, updated_at, last_message_at)
1434 VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL)
1435 "#,
1436 (
1437 conversation_id.to_string(),
1438 session_id.to_string(),
1439 normalized_title.clone(),
1440 ),
1441 )
1442 .await
1443 .with_context(|| format!("failed to create conversation {conversation_id}"))?;
1444
1445 if self
1446 .find_conversation(session_id, conversation_id)
1447 .await?
1448 .is_none()
1449 {
1450 anyhow::bail!("conversation belongs to a different session");
1451 }
1452
1453 self.conn
1454 .execute(
1455 r#"
1456 UPDATE conversations
1457 SET title = CASE
1458 WHEN title = ?2 AND ?3 IS NOT NULL THEN ?3
1459 ELSE title
1460 END
1461 WHERE id = ?1
1462 "#,
1463 (
1464 conversation_id.to_string(),
1465 DEFAULT_CONVERSATION_TITLE.to_string(),
1466 title.map(conversation_title_from_message),
1467 ),
1468 )
1469 .await
1470 .with_context(|| {
1471 format!("failed to normalize title for conversation {conversation_id}")
1472 })?;
1473
1474 Ok(())
1475 }
1476
1477 async fn next_message_sequence(&self, conversation_id: &str) -> Result<i64> {
1478 let mut stmt = self
1479 .conn
1480 .prepare(
1481 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM messages WHERE conversation_id = ?1",
1482 )
1483 .await
1484 .context("failed to prepare message sequence lookup")?;
1485 let mut rows = stmt.query([conversation_id]).await.with_context(|| {
1486 format!("failed to query next message sequence for {conversation_id}")
1487 })?;
1488
1489 let Some(row) = rows
1490 .next()
1491 .await
1492 .context("failed to read message sequence row")?
1493 else {
1494 return Ok(1);
1495 };
1496
1497 let sequence: i64 = row.get(0).context("failed to decode message sequence")?;
1498 Ok(sequence)
1499 }
1500
1501 async fn touch_conversation_activity(
1502 &self,
1503 conversation_id: &str,
1504 title_hint: Option<&str>,
1505 ) -> Result<()> {
1506 self.conn
1507 .execute(
1508 r#"
1509 UPDATE conversations
1510 SET title = CASE
1511 WHEN ?2 IS NOT NULL AND title = ?3 THEN ?2
1512 ELSE title
1513 END,
1514 updated_at = CURRENT_TIMESTAMP,
1515 last_message_at = CURRENT_TIMESTAMP
1516 WHERE id = ?1
1517 "#,
1518 (
1519 conversation_id.to_string(),
1520 title_hint.map(conversation_title_from_message),
1521 DEFAULT_CONVERSATION_TITLE.to_string(),
1522 ),
1523 )
1524 .await
1525 .with_context(|| format!("failed to touch conversation {conversation_id}"))?;
1526 Ok(())
1527 }
1528
1529 async fn find_message(&self, message_id: &str) -> Result<Option<ConversationMessage>> {
1530 let mut stmt = self
1531 .conn
1532 .prepare(
1533 r#"
1534 SELECT id, conversation_id, role, content, status, created_at, provider, model_id
1535 FROM messages
1536 WHERE id = ?1
1537 LIMIT 1
1538 "#,
1539 )
1540 .await
1541 .context("failed to prepare message lookup")?;
1542 let mut rows = stmt
1543 .query([message_id])
1544 .await
1545 .with_context(|| format!("failed to lookup message {message_id}"))?;
1546
1547 let Some(row) = rows
1548 .next()
1549 .await
1550 .context("failed to read message lookup row")?
1551 else {
1552 return Ok(None);
1553 };
1554
1555 Ok(Some(decode_message_row(&row)?))
1556 }
1557
1558 async fn message_conversation_id(&self, message_id: &str) -> Result<Option<String>> {
1559 let mut stmt = self
1560 .conn
1561 .prepare("SELECT conversation_id FROM messages WHERE id = ?1 LIMIT 1")
1562 .await
1563 .context("failed to prepare message conversation lookup")?;
1564 let mut rows = stmt
1565 .query([message_id])
1566 .await
1567 .with_context(|| format!("failed to lookup message conversation for {message_id}"))?;
1568
1569 let Some(row) = rows
1570 .next()
1571 .await
1572 .context("failed to read message conversation row")?
1573 else {
1574 return Ok(None);
1575 };
1576
1577 let conversation_id: String = row
1578 .get(0)
1579 .context("failed to decode message conversation id")?;
1580 Ok(Some(conversation_id))
1581 }
1582}
1583
1584#[derive(Debug, Clone)]
1585pub struct ProviderStatusRow {
1586 pub provider: String,
1587 pub last_test_ok: Option<bool>,
1588 pub last_test_detail: Option<String>,
1589 pub last_tested_at: Option<String>,
1590}
1591
1592fn decode_ingest_job_row(row: &Row) -> Result<IngestJobSummary> {
1593 Ok(IngestJobSummary {
1594 id: row.get(0).context("failed to decode ingest job id")?,
1595 source_root_id: row.get(1).ok(),
1596 scope_label: row.get(2).context("failed to decode ingest scope label")?,
1597 status: row.get(3).context("failed to decode ingest status")?,
1598 file_count: row
1599 .get::<i64>(4)
1600 .context("failed to decode ingest file count")? as usize,
1601 started_at: row.get(5).context("failed to decode ingest started_at")?,
1602 completed_at: row.get(6).ok(),
1603 last_error: row.get(7).ok(),
1604 })
1605}
1606
1607fn decode_conversation_row(row: &Row) -> Result<ConversationSummary> {
1608 Ok(ConversationSummary {
1609 id: row.get(0).context("failed to decode conversation id")?,
1610 title: row.get(1).context("failed to decode conversation title")?,
1611 created_at: row
1612 .get(2)
1613 .context("failed to decode conversation created_at")?,
1614 updated_at: row
1615 .get(3)
1616 .context("failed to decode conversation updated_at")?,
1617 last_message_at: row.get(4).ok(),
1618 })
1619}
1620
1621fn decode_message_row(row: &Row) -> Result<ConversationMessage> {
1622 Ok(ConversationMessage {
1623 id: row.get(0).context("failed to decode message id")?,
1624 conversation_id: row
1625 .get(1)
1626 .context("failed to decode message conversation_id")?,
1627 role: row.get(2).context("failed to decode message role")?,
1628 content: row.get(3).context("failed to decode message content")?,
1629 status: row.get(4).context("failed to decode message status")?,
1630 created_at: row.get(5).context("failed to decode message created_at")?,
1631 provider: row.get(6).ok(),
1632 model_id: row.get(7).ok(),
1633 })
1634}
1635
1636fn decode_workspace_task_run_summary(row: &Row) -> Result<WorkspaceTaskRunSummary> {
1637 let run_id: String = row
1638 .get(0)
1639 .context("failed to decode workspace task run id")?;
1640 let task_id: String = row
1641 .get(1)
1642 .context("failed to decode workspace task run task_id")?;
1643 let status: String = row
1644 .get(3)
1645 .context("failed to decode workspace task run status")?;
1646 let exit_code: Option<i64> = row.get(5).ok();
1647 let stdout_truncated: i64 = row
1648 .get(8)
1649 .context("failed to decode workspace task stdout_truncated")?;
1650 let stderr_truncated: i64 = row
1651 .get(9)
1652 .context("failed to decode workspace task stderr_truncated")?;
1653 let timed_out: i64 = row
1654 .get(10)
1655 .context("failed to decode workspace task timed_out")?;
1656 let cancel_requested: i64 = row
1657 .get(11)
1658 .context("failed to decode workspace task cancel_requested")?;
1659 let started_at_ms: i64 = row
1660 .get(12)
1661 .context("failed to decode workspace task started_at_ms")?;
1662 let completed_at_ms: Option<i64> = row.get(13).ok();
1663 let duration_ms: Option<i64> = row.get(14).ok();
1664 let max_output_bytes: i64 = row
1665 .get(17)
1666 .context("failed to decode workspace task max_output_bytes")?;
1667
1668 Ok(WorkspaceTaskRunSummary {
1669 run_id: Uuid::parse_str(&run_id).with_context(|| format!("invalid run UUID {run_id}"))?,
1670 task_id: parse_workspace_task_id(&task_id)?,
1671 path: row.get(2).context("failed to decode workspace task path")?,
1672 status: parse_workspace_task_run_status(&status)?,
1673 command_label: row
1674 .get(4)
1675 .context("failed to decode workspace task command_label")?,
1676 exit_code: exit_code
1677 .map(|value| {
1678 i32::try_from(value)
1679 .with_context(|| format!("workspace task exit_code out of range: {value}"))
1680 })
1681 .transpose()?,
1682 stdout_tail: row
1683 .get(6)
1684 .context("failed to decode workspace task stdout_tail")?,
1685 stderr_tail: row
1686 .get(7)
1687 .context("failed to decode workspace task stderr_tail")?,
1688 stdout_truncated: stdout_truncated != 0,
1689 stderr_truncated: stderr_truncated != 0,
1690 timed_out: timed_out != 0,
1691 cancel_requested: cancel_requested != 0,
1692 started_at_ms: i64_to_u64(started_at_ms, "workspace task started_at_ms")?,
1693 completed_at_ms: completed_at_ms
1694 .map(|value| i64_to_u64(value, "workspace task completed_at_ms"))
1695 .transpose()?,
1696 duration_ms: duration_ms
1697 .map(|value| i64_to_u64(value, "workspace task duration_ms"))
1698 .transpose()?,
1699 error: row.get(15).ok(),
1700 error_code: row.get(16).ok(),
1701 max_output_bytes: usize::try_from(max_output_bytes).with_context(|| {
1702 format!("workspace task max_output_bytes out of range: {max_output_bytes}")
1703 })?,
1704 })
1705}
1706
1707fn decode_workspace_file_change_audit_entry(row: &Row) -> Result<WorkspaceFileChangeAuditEntry> {
1708 let audit_id: String = row
1709 .get(0)
1710 .context("failed to decode workspace file change audit id")?;
1711 let action: String = row
1712 .get(1)
1713 .context("failed to decode workspace file change action")?;
1714 let status: String = row
1715 .get(4)
1716 .context("failed to decode workspace file change status")?;
1717 let size_bytes_before: Option<i64> = row.get(7).ok();
1718 let size_bytes_after: Option<i64> = row.get(8).ok();
1719 let created_at_ms: i64 = row
1720 .get(9)
1721 .context("failed to decode workspace file change created_at_ms")?;
1722 let applied_at_ms: Option<i64> = row.get(10).ok();
1723
1724 Ok(WorkspaceFileChangeAuditEntry {
1725 audit_id: Uuid::parse_str(&audit_id)
1726 .with_context(|| format!("invalid workspace file change audit UUID {audit_id}"))?,
1727 action: parse_workspace_file_change_action(&action)?,
1728 path: row
1729 .get(2)
1730 .context("failed to decode workspace file change path")?,
1731 target_path: row.get(3).ok(),
1732 status: parse_workspace_file_change_audit_status(&status)?,
1733 error: row.get(5).ok(),
1734 error_code: row.get(6).ok(),
1735 size_bytes_before: size_bytes_before
1736 .map(|value| i64_to_u64(value, "workspace file change size_bytes_before"))
1737 .transpose()?,
1738 size_bytes_after: size_bytes_after
1739 .map(|value| i64_to_u64(value, "workspace file change size_bytes_after"))
1740 .transpose()?,
1741 created_at_ms: i64_to_u64(created_at_ms, "workspace file change created_at_ms")?,
1742 applied_at_ms: applied_at_ms
1743 .map(|value| i64_to_u64(value, "workspace file change applied_at_ms"))
1744 .transpose()?,
1745 })
1746}
1747
1748fn normalize_conversation_title(title: Option<&str>) -> String {
1749 title
1750 .map(str::trim)
1751 .filter(|value| !value.is_empty())
1752 .map(conversation_title_from_message)
1753 .unwrap_or_else(|| DEFAULT_CONVERSATION_TITLE.to_string())
1754}
1755
1756fn conversation_title_from_message(message: &str) -> String {
1757 let trimmed = message.trim();
1758 if trimmed.is_empty() {
1759 return DEFAULT_CONVERSATION_TITLE.to_string();
1760 }
1761
1762 let mut title = String::new();
1763 for ch in trimmed.chars().take(CONVERSATION_TITLE_LIMIT) {
1764 title.push(ch);
1765 }
1766
1767 if trimmed.chars().count() > CONVERSATION_TITLE_LIMIT {
1768 title.push_str("...");
1769 }
1770
1771 title
1772}
1773
1774fn workspace_task_id_label(task_id: WorkspaceTaskId) -> &'static str {
1775 match task_id {
1776 WorkspaceTaskId::GitStatus => "git_status",
1777 WorkspaceTaskId::GitDiff => "git_diff",
1778 WorkspaceTaskId::CargoCheck => "cargo_check",
1779 WorkspaceTaskId::NpmCheck => "npm_check",
1780 }
1781}
1782
1783fn parse_workspace_task_id(value: &str) -> Result<WorkspaceTaskId> {
1784 match value {
1785 "git_status" => Ok(WorkspaceTaskId::GitStatus),
1786 "git_diff" => Ok(WorkspaceTaskId::GitDiff),
1787 "cargo_check" => Ok(WorkspaceTaskId::CargoCheck),
1788 "npm_check" => Ok(WorkspaceTaskId::NpmCheck),
1789 _ => Err(anyhow::anyhow!("unsupported workspace task id: {value}")),
1790 }
1791}
1792
1793fn workspace_task_run_status_label(status: WorkspaceTaskRunStatus) -> &'static str {
1794 match status {
1795 WorkspaceTaskRunStatus::Queued => "queued",
1796 WorkspaceTaskRunStatus::Running => "running",
1797 WorkspaceTaskRunStatus::Complete => "complete",
1798 WorkspaceTaskRunStatus::Failed => "failed",
1799 WorkspaceTaskRunStatus::Cancelled => "cancelled",
1800 WorkspaceTaskRunStatus::TimedOut => "timed_out",
1801 }
1802}
1803
1804fn parse_workspace_task_run_status(value: &str) -> Result<WorkspaceTaskRunStatus> {
1805 match value {
1806 "queued" => Ok(WorkspaceTaskRunStatus::Queued),
1807 "running" => Ok(WorkspaceTaskRunStatus::Running),
1808 "complete" => Ok(WorkspaceTaskRunStatus::Complete),
1809 "failed" => Ok(WorkspaceTaskRunStatus::Failed),
1810 "cancelled" => Ok(WorkspaceTaskRunStatus::Cancelled),
1811 "timed_out" => Ok(WorkspaceTaskRunStatus::TimedOut),
1812 _ => Err(anyhow::anyhow!(
1813 "unsupported workspace task run status: {value}"
1814 )),
1815 }
1816}
1817
1818fn workspace_file_change_action_label(action: WorkspaceFileChangeAction) -> &'static str {
1819 match action {
1820 WorkspaceFileChangeAction::WriteText => "write_text",
1821 WorkspaceFileChangeAction::DeleteFile => "delete_file",
1822 WorkspaceFileChangeAction::RenamePath => "rename_path",
1823 }
1824}
1825
1826fn parse_workspace_file_change_action(value: &str) -> Result<WorkspaceFileChangeAction> {
1827 match value {
1828 "write_text" => Ok(WorkspaceFileChangeAction::WriteText),
1829 "delete_file" => Ok(WorkspaceFileChangeAction::DeleteFile),
1830 "rename_path" => Ok(WorkspaceFileChangeAction::RenamePath),
1831 _ => Err(anyhow::anyhow!(
1832 "unsupported workspace file change action: {value}"
1833 )),
1834 }
1835}
1836
1837fn workspace_file_change_audit_status_label(
1838 status: WorkspaceFileChangeAuditStatus,
1839) -> &'static str {
1840 match status {
1841 WorkspaceFileChangeAuditStatus::Applying => "applying",
1842 WorkspaceFileChangeAuditStatus::Complete => "complete",
1843 WorkspaceFileChangeAuditStatus::Failed => "failed",
1844 }
1845}
1846
1847fn parse_workspace_file_change_audit_status(value: &str) -> Result<WorkspaceFileChangeAuditStatus> {
1848 match value {
1849 "applying" => Ok(WorkspaceFileChangeAuditStatus::Applying),
1850 "complete" => Ok(WorkspaceFileChangeAuditStatus::Complete),
1851 "failed" => Ok(WorkspaceFileChangeAuditStatus::Failed),
1852 _ => Err(anyhow::anyhow!(
1853 "unsupported workspace file change audit status: {value}"
1854 )),
1855 }
1856}
1857
1858fn bool_to_i64(value: bool) -> i64 {
1859 if value { 1 } else { 0 }
1860}
1861
1862fn u64_to_i64(value: u64, label: &str) -> Result<i64> {
1863 i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1864}
1865
1866fn usize_to_i64(value: usize, label: &str) -> Result<i64> {
1867 i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1868}
1869
1870fn i64_to_u64(value: i64, label: &str) -> Result<u64> {
1871 u64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1872}
1873
1874fn unix_epoch_ms() -> u64 {
1875 SystemTime::now()
1876 .duration_since(UNIX_EPOCH)
1877 .map(|duration| duration.as_millis() as u64)
1878 .unwrap_or(0)
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883 use std::path::PathBuf;
1884
1885 use soma_studio_core::{
1886 AppConfig, ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
1887 WorkspaceFileChangeAuditStatus, WorkspaceFileChangePreviewRequest, WorkspaceTaskId,
1888 WorkspaceTaskRunStatus, WorkspaceTaskRunSummary,
1889 };
1890 use turso::Builder;
1891 use uuid::Uuid;
1892
1893 use super::{
1894 DEFAULT_CONVERSATION_TITLE, NewConversationMessage, STORAGE_SCHEMA_VERSION, StudioStorage,
1895 WORKSPACE_HISTORY_RETAIN_PER_SESSION,
1896 };
1897
1898 #[tokio::test]
1899 async fn source_root_roundtrip_persists() {
1900 let temp_dir = std::env::temp_dir().join(format!("soma-studio-storage-{}", Uuid::new_v4()));
1901 std::fs::create_dir_all(&temp_dir).expect("temp dir");
1902
1903 let config = test_config(&temp_dir);
1904 let storage = StudioStorage::open(&config).await.expect("storage");
1905 let source_root = SourceRootSummary {
1906 id: Uuid::new_v4(),
1907 path: "F:/docs".to_string(),
1908 read_only: true,
1909 };
1910
1911 let first = storage
1912 .upsert_source_root(&source_root)
1913 .await
1914 .expect("insert");
1915 let second = storage
1916 .upsert_source_root(&source_root)
1917 .await
1918 .expect("duplicate");
1919 let listed = storage.list_source_roots().await.expect("list");
1920
1921 assert_eq!(first.id, second.id);
1922 assert_eq!(listed.len(), 1);
1923 assert_eq!(listed[0].path, "F:/docs");
1924
1925 let _ = std::fs::remove_dir_all(temp_dir);
1926 }
1927
1928 #[tokio::test]
1929 async fn provider_selection_roundtrip_persists() {
1930 let temp_dir =
1931 std::env::temp_dir().join(format!("soma-studio-selection-{}", Uuid::new_v4()));
1932 std::fs::create_dir_all(&temp_dir).expect("temp dir");
1933
1934 let config = test_config(&temp_dir);
1935 let storage = StudioStorage::open(&config).await.expect("storage");
1936 let saved = storage
1937 .save_provider_selection(&ProviderSelectionResponse {
1938 selected_provider: Some("ollama".to_string()),
1939 selected_model_id: Some("qwen3:8b".to_string()),
1940 })
1941 .await
1942 .expect("save selection");
1943 let loaded = storage
1944 .load_provider_selection()
1945 .await
1946 .expect("load selection");
1947
1948 assert_eq!(saved.selected_provider.as_deref(), Some("ollama"));
1949 assert_eq!(loaded.selected_model_id.as_deref(), Some("qwen3:8b"));
1950
1951 let cleared = storage
1952 .save_provider_selection(&ProviderSelectionResponse {
1953 selected_provider: None,
1954 selected_model_id: None,
1955 })
1956 .await
1957 .expect("clear selection");
1958 assert!(cleared.selected_provider.is_none());
1959 assert!(cleared.selected_model_id.is_none());
1960
1961 let _ = std::fs::remove_dir_all(temp_dir);
1962 }
1963
1964 #[tokio::test]
1965 async fn session_roundtrip_persists() {
1966 let temp_dir = std::env::temp_dir().join(format!("soma-studio-session-{}", Uuid::new_v4()));
1967 std::fs::create_dir_all(&temp_dir).expect("temp dir");
1968
1969 let config = test_config(&temp_dir);
1970 let storage = StudioStorage::open(&config).await.expect("storage");
1971 let session_id = Uuid::new_v4().to_string();
1972
1973 assert!(
1974 !storage
1975 .session_exists(&session_id)
1976 .await
1977 .expect("session missing")
1978 );
1979 storage
1980 .persist_session(&session_id)
1981 .await
1982 .expect("persist session");
1983 assert!(
1984 storage
1985 .session_exists(&session_id)
1986 .await
1987 .expect("session exists")
1988 );
1989
1990 let _ = std::fs::remove_dir_all(temp_dir);
1991 }
1992
1993 #[tokio::test]
1994 async fn workspace_task_run_history_roundtrip_persists() {
1995 let temp_dir =
1996 std::env::temp_dir().join(format!("soma-studio-task-runs-{}", Uuid::new_v4()));
1997 std::fs::create_dir_all(&temp_dir).expect("temp dir");
1998
1999 let config = test_config(&temp_dir);
2000 let storage = StudioStorage::open(&config).await.expect("storage");
2001 let session_id = Uuid::new_v4().to_string();
2002 let run_id = Uuid::new_v4();
2003 let mut summary = WorkspaceTaskRunSummary {
2004 run_id,
2005 task_id: WorkspaceTaskId::GitStatus,
2006 path: ".".to_string(),
2007 status: WorkspaceTaskRunStatus::Running,
2008 command_label: "git status --short --branch".to_string(),
2009 exit_code: None,
2010 stdout_tail: String::new(),
2011 stderr_tail: String::new(),
2012 stdout_truncated: false,
2013 stderr_truncated: false,
2014 timed_out: false,
2015 cancel_requested: false,
2016 started_at_ms: 1_000,
2017 completed_at_ms: None,
2018 duration_ms: None,
2019 error: None,
2020 error_code: None,
2021 max_output_bytes: 64 * 1024,
2022 };
2023
2024 storage
2025 .upsert_workspace_task_run(&session_id, &summary)
2026 .await
2027 .expect("persist running task");
2028 summary.status = WorkspaceTaskRunStatus::Complete;
2029 summary.exit_code = Some(0);
2030 summary.stdout_tail = "## main\n".to_string();
2031 summary.completed_at_ms = Some(1_050);
2032 summary.duration_ms = Some(50);
2033 summary.error_code = None;
2034 storage
2035 .upsert_workspace_task_run(&session_id, &summary)
2036 .await
2037 .expect("persist completed task");
2038
2039 let listed = storage
2040 .list_workspace_task_runs(&session_id, 10, None)
2041 .await
2042 .expect("list task runs");
2043 let loaded = storage
2044 .get_workspace_task_run(&session_id, run_id)
2045 .await
2046 .expect("load task run")
2047 .expect("task run exists");
2048
2049 assert_eq!(listed.len(), 1);
2050 assert_eq!(loaded.run_id, run_id);
2051 assert_eq!(loaded.status, WorkspaceTaskRunStatus::Complete);
2052 assert_eq!(loaded.stdout_tail, "## main\n");
2053 assert_eq!(loaded.duration_ms, Some(50));
2054 assert!(
2055 storage
2056 .list_workspace_task_runs("other-session", 10, None)
2057 .await
2058 .expect("other session task runs")
2059 .is_empty()
2060 );
2061
2062 let interrupted_id = Uuid::new_v4();
2063 let interrupted = WorkspaceTaskRunSummary {
2064 run_id: interrupted_id,
2065 task_id: WorkspaceTaskId::CargoCheck,
2066 path: ".".to_string(),
2067 status: WorkspaceTaskRunStatus::Running,
2068 command_label: "cargo check --workspace".to_string(),
2069 exit_code: None,
2070 stdout_tail: String::new(),
2071 stderr_tail: String::new(),
2072 stdout_truncated: false,
2073 stderr_truncated: false,
2074 timed_out: false,
2075 cancel_requested: false,
2076 started_at_ms: 1_100,
2077 completed_at_ms: None,
2078 duration_ms: None,
2079 error: None,
2080 error_code: None,
2081 max_output_bytes: 64 * 1024,
2082 };
2083 storage
2084 .upsert_workspace_task_run(&session_id, &interrupted)
2085 .await
2086 .expect("persist interrupted candidate");
2087 drop(storage);
2088
2089 let reopened = StudioStorage::open(&config)
2090 .await
2091 .expect("reopened storage");
2092 let interrupted_loaded = reopened
2093 .get_workspace_task_run(&session_id, interrupted_id)
2094 .await
2095 .expect("load interrupted")
2096 .expect("interrupted task exists");
2097
2098 assert_eq!(interrupted_loaded.status, WorkspaceTaskRunStatus::Failed);
2099 assert_eq!(
2100 interrupted_loaded.error.as_deref(),
2101 Some("workspace task interrupted before completion")
2102 );
2103 assert_eq!(
2104 interrupted_loaded.error_code.as_deref(),
2105 Some("workspace_task_interrupted")
2106 );
2107 assert!(interrupted_loaded.completed_at_ms.is_some());
2108 let filtered_interrupted = reopened
2109 .list_workspace_task_runs(&session_id, 10, Some("workspace_task_interrupted"))
2110 .await
2111 .expect("list interrupted task runs");
2112 assert_eq!(filtered_interrupted.len(), 1);
2113 assert_eq!(filtered_interrupted[0].run_id, interrupted_id);
2114 assert!(
2115 reopened
2116 .list_workspace_task_runs(&session_id, 10, Some("workspace_task_failed_exit"))
2117 .await
2118 .expect("list failed exit task runs")
2119 .is_empty()
2120 );
2121
2122 let _ = std::fs::remove_dir_all(temp_dir);
2123 }
2124
2125 #[tokio::test]
2126 async fn workspace_file_change_audit_roundtrip_persists_without_content() {
2127 let temp_dir =
2128 std::env::temp_dir().join(format!("soma-studio-file-audit-{}", Uuid::new_v4()));
2129 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2130
2131 let config = test_config(&temp_dir);
2132 let storage = StudioStorage::open(&config).await.expect("storage");
2133 let session_id = Uuid::new_v4().to_string();
2134 let request = WorkspaceFileChangePreviewRequest {
2135 action: WorkspaceFileChangeAction::WriteText,
2136 path: "docs/new.md".to_string(),
2137 target_path: None,
2138 content: Some("do not persist this content".to_string()),
2139 expected_modified_at_ms: None,
2140 };
2141
2142 let started = storage
2143 .start_workspace_file_change_audit(&session_id, &request, None)
2144 .await
2145 .expect("start file audit");
2146 assert_eq!(started.status, WorkspaceFileChangeAuditStatus::Applying);
2147 assert_eq!(started.path, "docs/new.md");
2148 assert_eq!(started.size_bytes_before, None);
2149
2150 let completed = storage
2151 .finish_workspace_file_change_audit(
2152 &session_id,
2153 started.audit_id,
2154 WorkspaceFileChangeAuditStatus::Complete,
2155 None,
2156 None,
2157 Some(12),
2158 )
2159 .await
2160 .expect("finish file audit");
2161 let listed = storage
2162 .list_workspace_file_change_audits(&session_id, 10, None)
2163 .await
2164 .expect("list file audits");
2165
2166 assert_eq!(completed.status, WorkspaceFileChangeAuditStatus::Complete);
2167 assert_eq!(completed.size_bytes_after, Some(12));
2168 assert_eq!(listed.len(), 1);
2169 assert_eq!(listed[0].audit_id, started.audit_id);
2170 assert_eq!(listed[0].error, None);
2171 assert_eq!(listed[0].error_code, None);
2172 assert!(
2173 storage
2174 .list_workspace_file_change_audits("other-session", 10, None)
2175 .await
2176 .expect("other session file audits")
2177 .is_empty()
2178 );
2179
2180 let interrupted_request = WorkspaceFileChangePreviewRequest {
2181 action: WorkspaceFileChangeAction::RenamePath,
2182 path: "docs/new.md".to_string(),
2183 target_path: Some("docs/renamed.md".to_string()),
2184 content: None,
2185 expected_modified_at_ms: Some(1_000),
2186 };
2187 let interrupted = storage
2188 .start_workspace_file_change_audit(&session_id, &interrupted_request, Some(12))
2189 .await
2190 .expect("start interrupted file audit");
2191 drop(storage);
2192
2193 let reopened = StudioStorage::open(&config)
2194 .await
2195 .expect("reopened storage");
2196 let reopened_audits = reopened
2197 .list_workspace_file_change_audits(&session_id, 10, None)
2198 .await
2199 .expect("reopened file audits");
2200 let interrupted_loaded = reopened_audits
2201 .iter()
2202 .find(|audit| audit.audit_id == interrupted.audit_id)
2203 .expect("interrupted audit exists");
2204
2205 assert_eq!(
2206 interrupted_loaded.status,
2207 WorkspaceFileChangeAuditStatus::Failed
2208 );
2209 assert_eq!(
2210 interrupted_loaded.error.as_deref(),
2211 Some("workspace file change interrupted before completion")
2212 );
2213 assert_eq!(
2214 interrupted_loaded.error_code.as_deref(),
2215 Some("workspace_file_change_interrupted")
2216 );
2217 assert!(interrupted_loaded.applied_at_ms.is_some());
2218 let filtered_interrupted = reopened
2219 .list_workspace_file_change_audits(
2220 &session_id,
2221 10,
2222 Some("workspace_file_change_interrupted"),
2223 )
2224 .await
2225 .expect("list interrupted file audits");
2226 assert_eq!(filtered_interrupted.len(), 1);
2227 assert_eq!(filtered_interrupted[0].audit_id, interrupted.audit_id);
2228 assert!(
2229 reopened
2230 .list_workspace_file_change_audits(&session_id, 10, Some("workspace_conflict"))
2231 .await
2232 .expect("list conflict file audits")
2233 .is_empty()
2234 );
2235
2236 let _ = std::fs::remove_dir_all(temp_dir);
2237 }
2238
2239 #[tokio::test]
2240 async fn workspace_task_run_schema_adds_error_code_to_existing_db() {
2241 let temp_dir =
2242 std::env::temp_dir().join(format!("soma-studio-task-run-migration-{}", Uuid::new_v4()));
2243 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2244
2245 let config = test_config(&temp_dir);
2246 let db_path = config.db_path.to_str().expect("db path");
2247 let db = Builder::new_local(db_path).build().await.expect("db");
2248 let conn = db.connect().expect("connect");
2249 conn.execute_batch(
2250 r#"
2251 CREATE TABLE workspace_task_runs (
2252 run_id TEXT PRIMARY KEY,
2253 session_id TEXT NOT NULL,
2254 task_id TEXT NOT NULL,
2255 path TEXT NOT NULL,
2256 status TEXT NOT NULL,
2257 command_label TEXT NOT NULL,
2258 exit_code INTEGER,
2259 stdout_tail TEXT NOT NULL,
2260 stderr_tail TEXT NOT NULL,
2261 stdout_truncated INTEGER NOT NULL,
2262 stderr_truncated INTEGER NOT NULL,
2263 timed_out INTEGER NOT NULL,
2264 cancel_requested INTEGER NOT NULL,
2265 started_at_ms INTEGER NOT NULL,
2266 completed_at_ms INTEGER,
2267 duration_ms INTEGER,
2268 error TEXT,
2269 max_output_bytes INTEGER NOT NULL,
2270 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
2271 );
2272 "#,
2273 )
2274 .await
2275 .expect("old workspace task schema");
2276 drop(conn);
2277 drop(db);
2278
2279 let storage = StudioStorage::open(&config)
2280 .await
2281 .expect("migrated storage");
2282 let schema_status = storage.schema_status().await.expect("schema status");
2283 assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
2284 assert!(schema_status.is_current());
2285
2286 let session_id = Uuid::new_v4().to_string();
2287 let run_id = Uuid::new_v4();
2288 storage
2289 .upsert_workspace_task_run(
2290 &session_id,
2291 &WorkspaceTaskRunSummary {
2292 run_id,
2293 task_id: WorkspaceTaskId::GitStatus,
2294 path: ".".to_string(),
2295 status: WorkspaceTaskRunStatus::Failed,
2296 command_label: "git status --short --branch".to_string(),
2297 exit_code: Some(1),
2298 stdout_tail: String::new(),
2299 stderr_tail: "failed".to_string(),
2300 stdout_truncated: false,
2301 stderr_truncated: false,
2302 timed_out: false,
2303 cancel_requested: false,
2304 started_at_ms: 1_000,
2305 completed_at_ms: Some(1_010),
2306 duration_ms: Some(10),
2307 error: Some("workspace task exited with code 1".to_string()),
2308 error_code: Some("workspace_task_failed_exit".to_string()),
2309 max_output_bytes: 64 * 1024,
2310 },
2311 )
2312 .await
2313 .expect("persist migrated task run");
2314
2315 let loaded = storage
2316 .get_workspace_task_run(&session_id, run_id)
2317 .await
2318 .expect("load migrated task run")
2319 .expect("migrated task run exists");
2320 assert_eq!(
2321 loaded.error_code.as_deref(),
2322 Some("workspace_task_failed_exit")
2323 );
2324
2325 let _ = std::fs::remove_dir_all(temp_dir);
2326 }
2327
2328 #[tokio::test]
2329 async fn workspace_file_change_audit_schema_adds_error_code_to_existing_db() {
2330 let temp_dir = std::env::temp_dir().join(format!(
2331 "soma-studio-file-audit-migration-{}",
2332 Uuid::new_v4()
2333 ));
2334 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2335
2336 let config = test_config(&temp_dir);
2337 let db_path = config.db_path.to_str().expect("db path");
2338 let db = Builder::new_local(db_path).build().await.expect("db");
2339 let conn = db.connect().expect("connect");
2340 conn.execute_batch(
2341 r#"
2342 CREATE TABLE workspace_file_change_audits (
2343 audit_id TEXT PRIMARY KEY,
2344 session_id TEXT NOT NULL,
2345 action TEXT NOT NULL,
2346 path TEXT NOT NULL,
2347 target_path TEXT,
2348 status TEXT NOT NULL,
2349 error TEXT,
2350 size_bytes_before INTEGER,
2351 size_bytes_after INTEGER,
2352 created_at_ms INTEGER NOT NULL,
2353 applied_at_ms INTEGER,
2354 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
2355 );
2356 "#,
2357 )
2358 .await
2359 .expect("old workspace file audit schema");
2360 drop(conn);
2361 drop(db);
2362
2363 let storage = StudioStorage::open(&config)
2364 .await
2365 .expect("migrated storage");
2366 let schema_status = storage.schema_status().await.expect("schema status");
2367 assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
2368 assert!(schema_status.is_current());
2369
2370 let session_id = Uuid::new_v4().to_string();
2371 let request = WorkspaceFileChangePreviewRequest {
2372 action: WorkspaceFileChangeAction::RenamePath,
2373 path: "docs/new.md".to_string(),
2374 target_path: Some("docs/renamed.md".to_string()),
2375 content: None,
2376 expected_modified_at_ms: Some(1_000),
2377 };
2378 let started = storage
2379 .start_workspace_file_change_audit(&session_id, &request, Some(10))
2380 .await
2381 .expect("start migrated file audit");
2382 let failed = storage
2383 .finish_workspace_file_change_audit(
2384 &session_id,
2385 started.audit_id,
2386 WorkspaceFileChangeAuditStatus::Failed,
2387 Some("workspace rename target already exists"),
2388 Some("workspace_conflict"),
2389 Some(10),
2390 )
2391 .await
2392 .expect("finish migrated file audit");
2393
2394 assert_eq!(failed.error_code.as_deref(), Some("workspace_conflict"));
2395
2396 let _ = std::fs::remove_dir_all(temp_dir);
2397 }
2398
2399 #[tokio::test]
2400 async fn storage_rejects_newer_schema_version() {
2401 let temp_dir =
2402 std::env::temp_dir().join(format!("soma-studio-newer-schema-{}", Uuid::new_v4()));
2403 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2404
2405 let config = test_config(&temp_dir);
2406 let db_path = config.db_path.to_str().expect("db path");
2407 let db = Builder::new_local(db_path).build().await.expect("db");
2408 let conn = db.connect().expect("connect");
2409 conn.execute(
2410 &format!("PRAGMA user_version = {}", STORAGE_SCHEMA_VERSION + 1),
2411 (),
2412 )
2413 .await
2414 .expect("set newer schema version");
2415 drop(conn);
2416 drop(db);
2417
2418 let error = StudioStorage::open(&config)
2419 .await
2420 .expect_err("newer schema must be rejected");
2421 assert!(
2422 error.to_string().contains("newer than supported version"),
2423 "{error}"
2424 );
2425
2426 let db = Builder::new_local(db_path)
2427 .build()
2428 .await
2429 .expect("db reopen");
2430 let conn = db.connect().expect("connect reopened db");
2431 let mut rows = conn
2432 .query(
2433 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'source_roots'",
2434 (),
2435 )
2436 .await
2437 .expect("query table count");
2438 let row = rows
2439 .next()
2440 .await
2441 .expect("read table count")
2442 .expect("table count row");
2443 let table_count: i64 = row.get(0).expect("decode table count");
2444 assert_eq!(table_count, 0);
2445 drop(rows);
2446 drop(conn);
2447 drop(db);
2448
2449 let _ = std::fs::remove_dir_all(temp_dir);
2450 }
2451
2452 #[tokio::test]
2453 async fn workspace_history_retention_limits_rows_per_session() {
2454 let temp_dir =
2455 std::env::temp_dir().join(format!("soma-studio-history-retention-{}", Uuid::new_v4()));
2456 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2457
2458 let config = test_config(&temp_dir);
2459 let storage = StudioStorage::open(&config).await.expect("storage");
2460 let session_id = Uuid::new_v4().to_string();
2461 let request = WorkspaceFileChangePreviewRequest {
2462 action: WorkspaceFileChangeAction::DeleteFile,
2463 path: "docs/old.md".to_string(),
2464 target_path: None,
2465 content: None,
2466 expected_modified_at_ms: Some(1_000),
2467 };
2468
2469 for index in 0..205 {
2470 storage
2471 .upsert_workspace_task_run(
2472 &session_id,
2473 &WorkspaceTaskRunSummary {
2474 run_id: Uuid::new_v4(),
2475 task_id: WorkspaceTaskId::GitStatus,
2476 path: ".".to_string(),
2477 status: WorkspaceTaskRunStatus::Complete,
2478 command_label: "git status --short --branch".to_string(),
2479 exit_code: Some(0),
2480 stdout_tail: String::new(),
2481 stderr_tail: String::new(),
2482 stdout_truncated: false,
2483 stderr_truncated: false,
2484 timed_out: false,
2485 cancel_requested: false,
2486 started_at_ms: index,
2487 completed_at_ms: Some(index + 1),
2488 duration_ms: Some(1),
2489 error: None,
2490 error_code: None,
2491 max_output_bytes: 64 * 1024,
2492 },
2493 )
2494 .await
2495 .expect("persist retained task run");
2496 storage
2497 .start_workspace_file_change_audit(&session_id, &request, Some(index))
2498 .await
2499 .expect("persist retained file audit");
2500 }
2501
2502 let task_run_count = count_rows(
2503 &storage,
2504 "SELECT COUNT(*) FROM workspace_task_runs WHERE session_id = ?1",
2505 &session_id,
2506 )
2507 .await;
2508 let file_audit_count = count_rows(
2509 &storage,
2510 "SELECT COUNT(*) FROM workspace_file_change_audits WHERE session_id = ?1",
2511 &session_id,
2512 )
2513 .await;
2514
2515 assert_eq!(task_run_count, WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64);
2516 assert_eq!(
2517 file_audit_count,
2518 WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64
2519 );
2520
2521 let _ = std::fs::remove_dir_all(temp_dir);
2522 }
2523
2524 #[tokio::test]
2525 async fn conversation_and_message_roundtrip_persists() {
2526 let temp_dir =
2527 std::env::temp_dir().join(format!("soma-studio-conversation-{}", Uuid::new_v4()));
2528 std::fs::create_dir_all(&temp_dir).expect("temp dir");
2529
2530 let config = test_config(&temp_dir);
2531 let storage = StudioStorage::open(&config).await.expect("storage");
2532
2533 let conversation = storage
2534 .create_conversation("session-a", None)
2535 .await
2536 .expect("conversation");
2537 assert_eq!(conversation.title, DEFAULT_CONVERSATION_TITLE);
2538
2539 let user_message = storage
2540 .create_message(&NewConversationMessage {
2541 conversation_id: conversation.id.clone(),
2542 role: "user".to_string(),
2543 content: "Discuss the ingest milestone split".to_string(),
2544 status: "complete".to_string(),
2545 provider: None,
2546 model_id: None,
2547 })
2548 .await
2549 .expect("user message");
2550 let assistant_message = storage
2551 .create_message(&NewConversationMessage {
2552 conversation_id: conversation.id.clone(),
2553 role: "assistant".to_string(),
2554 content: String::new(),
2555 status: "streaming".to_string(),
2556 provider: Some("ollama".to_string()),
2557 model_id: Some("qwen3:8b".to_string()),
2558 })
2559 .await
2560 .expect("assistant placeholder");
2561 let partial_assistant = storage
2562 .append_message_delta(&assistant_message.id, "Start with conversation ")
2563 .await
2564 .expect("assistant delta");
2565 let completed_assistant = storage
2566 .update_message_content(
2567 &assistant_message.id,
2568 "Start with conversation persistence and restore APIs.",
2569 "complete",
2570 )
2571 .await
2572 .expect("assistant completion");
2573
2574 let conversations = storage
2575 .list_conversations("session-a")
2576 .await
2577 .expect("conversations");
2578 let messages = storage
2579 .list_conversation_messages("session-a", &conversation.id)
2580 .await
2581 .expect("messages");
2582
2583 assert_eq!(conversations.len(), 1);
2584 assert_eq!(conversations[0].id, conversation.id);
2585 assert_eq!(conversations[0].title, "Discuss the ingest milestone split");
2586 assert_eq!(messages.len(), 2);
2587 assert_eq!(messages[0].id, user_message.id);
2588 assert_eq!(messages[1].id, assistant_message.id);
2589 assert_eq!(partial_assistant.content, "Start with conversation ");
2590 assert_eq!(partial_assistant.status, "streaming");
2591 assert_eq!(completed_assistant.status, "complete");
2592 assert_eq!(completed_assistant.provider.as_deref(), Some("ollama"));
2593 assert!(
2594 storage
2595 .list_conversations("session-b")
2596 .await
2597 .expect("session-b conversations")
2598 .is_empty()
2599 );
2600 assert!(
2601 storage
2602 .list_conversation_messages("session-b", &conversation.id)
2603 .await
2604 .expect("session-b messages")
2605 .is_empty()
2606 );
2607
2608 let deleted = storage
2609 .delete_conversation("session-a", &conversation.id)
2610 .await
2611 .expect("delete conversation");
2612 let messages_after_delete = storage
2613 .list_conversation_messages("session-a", &conversation.id)
2614 .await
2615 .expect("messages after delete");
2616
2617 assert!(deleted);
2618 assert!(messages_after_delete.is_empty());
2619 assert!(
2620 storage
2621 .list_conversations("session-a")
2622 .await
2623 .expect("conversations after delete")
2624 .is_empty()
2625 );
2626
2627 let _ = std::fs::remove_dir_all(temp_dir);
2628 }
2629
2630 fn test_config(temp_dir: &std::path::Path) -> AppConfig {
2631 AppConfig {
2632 app_name: "Soma Studio".to_string(),
2633 bind_addr: "127.0.0.1:0".to_string(),
2634 project_root: temp_dir.to_path_buf(),
2635 data_dir: temp_dir.to_path_buf(),
2636 derived_dir: temp_dir.join("derived"),
2637 notebook_dir: temp_dir.join("notebook"),
2638 user_assets_dir: temp_dir.join("assets"),
2639 db_path: temp_dir.join("test.db"),
2640 web_build_dir: PathBuf::from("unused"),
2641 web_shell_file: PathBuf::from("unused/spa.html"),
2642 }
2643 }
2644
2645 async fn count_rows(storage: &StudioStorage, sql: &str, session_id: &str) -> i64 {
2646 let mut rows = storage
2647 .conn
2648 .query(sql, [session_id])
2649 .await
2650 .expect("count query");
2651 let row = rows
2652 .next()
2653 .await
2654 .expect("count row read")
2655 .expect("count row");
2656 row.get(0).expect("count value")
2657 }
2658}