Skip to main content

soma_studio_server/
storage.rs

1use anyhow::{Context, Result, bail};
2use soma_studio_core::{
3    AppConfig, ConversationMessage, ConversationSummary, IngestJobSummary, IngestStatusResponse,
4    ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
5    WorkspaceFileChangeAuditEntry, WorkspaceFileChangeAuditStatus,
6    WorkspaceFileChangePreviewRequest, WorkspaceTaskId, WorkspaceTaskRunStatus,
7    WorkspaceTaskRunSummary,
8};
9use std::time::{SystemTime, UNIX_EPOCH};
10use turso::{Builder, Connection, Row};
11use uuid::Uuid;
12
13const DEFAULT_CONVERSATION_TITLE: &str = "New conversation";
14const CONVERSATION_TITLE_LIMIT: usize = 60;
15const WORKSPACE_HISTORY_RETAIN_PER_SESSION: usize = 200;
16pub const STORAGE_SCHEMA_VERSION: i64 = 3;
17
18#[derive(Debug, Clone)]
19pub struct StudioStorage {
20    conn: Connection,
21}
22
23#[derive(Debug, Clone)]
24pub struct NewConversationMessage {
25    pub conversation_id: String,
26    pub role: String,
27    pub content: String,
28    pub status: String,
29    pub provider: Option<String>,
30    pub model_id: Option<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct IndexedSourceFileRow {
35    pub source_root_id: String,
36    pub relative_path: String,
37    pub absolute_path: String,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct StorageSchemaStatus {
42    pub current_version: i64,
43    pub expected_version: i64,
44}
45
46impl StorageSchemaStatus {
47    pub fn is_current(&self) -> bool {
48        self.current_version == self.expected_version
49    }
50}
51
52impl StudioStorage {
53    pub async fn open(config: &AppConfig) -> Result<Self> {
54        let db_path = config
55            .db_path
56            .to_str()
57            .with_context(|| format!("invalid db path {}", config.db_path.display()))?;
58        let db = Builder::new_local(db_path).build().await.with_context(|| {
59            format!(
60                "failed to open local Turso db at {}",
61                config.db_path.display()
62            )
63        })?;
64        let conn = db
65            .connect()
66            .context("failed to connect to local Turso db")?;
67        let storage = Self { conn };
68        storage.reject_newer_schema_version().await?;
69        storage.initialize_schema().await?;
70        Ok(storage)
71    }
72
73    async fn initialize_schema(&self) -> Result<()> {
74        self.conn
75            .execute_batch(
76                r#"
77                CREATE TABLE IF NOT EXISTS source_roots (
78                    path TEXT PRIMARY KEY,
79                    id TEXT NOT NULL,
80                    read_only INTEGER NOT NULL DEFAULT 1,
81                    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
82                );
83
84                CREATE TABLE IF NOT EXISTS provider_status (
85                    provider TEXT PRIMARY KEY,
86                    last_test_ok INTEGER,
87                    last_test_detail TEXT,
88                    last_tested_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
89                );
90
91                CREATE TABLE IF NOT EXISTS provider_selection (
92                    singleton INTEGER PRIMARY KEY CHECK (singleton = 1),
93                    provider TEXT,
94                    model_id TEXT,
95                    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
96                );
97
98                CREATE TABLE IF NOT EXISTS sessions (
99                    id TEXT PRIMARY KEY,
100                    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
101                );
102
103                CREATE TABLE IF NOT EXISTS conversations (
104                    id TEXT PRIMARY KEY,
105                    session_id TEXT NOT NULL,
106                    title TEXT NOT NULL,
107                    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
108                    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
109                    last_message_at TEXT
110                );
111
112                CREATE TABLE IF NOT EXISTS messages (
113                    id TEXT PRIMARY KEY,
114                    conversation_id TEXT NOT NULL,
115                    sequence INTEGER NOT NULL,
116                    role TEXT NOT NULL,
117                    content TEXT NOT NULL,
118                    status TEXT NOT NULL,
119                    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
120                    provider TEXT,
121                    model_id TEXT
122                );
123
124                CREATE INDEX IF NOT EXISTS idx_conversations_recent
125                ON conversations (session_id, last_message_at DESC, updated_at DESC, created_at DESC);
126
127                CREATE INDEX IF NOT EXISTS idx_messages_conversation_created
128                ON messages (conversation_id, sequence);
129
130                CREATE TABLE IF NOT EXISTS source_files (
131                    source_root_id TEXT NOT NULL,
132                    relative_path TEXT NOT NULL,
133                    absolute_path TEXT NOT NULL,
134                    fingerprint TEXT NOT NULL,
135                    size_bytes INTEGER NOT NULL,
136                    modified_at TEXT NOT NULL,
137                    file_type TEXT NOT NULL,
138                    status TEXT NOT NULL,
139                    last_error TEXT,
140                    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
141                    PRIMARY KEY (source_root_id, relative_path)
142                );
143
144                CREATE TABLE IF NOT EXISTS ingest_jobs (
145                    id TEXT PRIMARY KEY,
146                    source_root_id TEXT,
147                    scope_label TEXT NOT NULL,
148                    status TEXT NOT NULL,
149                    file_count INTEGER NOT NULL DEFAULT 0,
150                    started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
151                    completed_at TEXT,
152                    last_error TEXT
153                );
154
155                CREATE INDEX IF NOT EXISTS idx_ingest_jobs_started_at
156                ON ingest_jobs (started_at DESC);
157
158                CREATE TABLE IF NOT EXISTS workspace_task_runs (
159                    run_id TEXT PRIMARY KEY,
160                    session_id TEXT NOT NULL,
161                    task_id TEXT NOT NULL,
162                    path TEXT NOT NULL,
163                    status TEXT NOT NULL,
164                    command_label TEXT NOT NULL,
165                    exit_code INTEGER,
166                    stdout_tail TEXT NOT NULL,
167                    stderr_tail TEXT NOT NULL,
168                    stdout_truncated INTEGER NOT NULL,
169                    stderr_truncated INTEGER NOT NULL,
170                    timed_out INTEGER NOT NULL,
171                    cancel_requested INTEGER NOT NULL,
172                    started_at_ms INTEGER NOT NULL,
173                    completed_at_ms INTEGER,
174                    duration_ms INTEGER,
175                    error TEXT,
176                    error_code TEXT,
177                    max_output_bytes INTEGER NOT NULL,
178                    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
179                );
180
181                CREATE INDEX IF NOT EXISTS idx_workspace_task_runs_recent
182                ON workspace_task_runs (session_id, started_at_ms DESC);
183
184                CREATE TABLE IF NOT EXISTS workspace_file_change_audits (
185                    audit_id TEXT PRIMARY KEY,
186                    session_id TEXT NOT NULL,
187                    action TEXT NOT NULL,
188                    path TEXT NOT NULL,
189                    target_path TEXT,
190                    status TEXT NOT NULL,
191                    error TEXT,
192                    error_code TEXT,
193                    size_bytes_before INTEGER,
194                    size_bytes_after INTEGER,
195                    created_at_ms INTEGER NOT NULL,
196                    applied_at_ms INTEGER,
197                    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
198                );
199
200                CREATE INDEX IF NOT EXISTS idx_workspace_file_change_audits_recent
201                ON workspace_file_change_audits (session_id, created_at_ms DESC);
202                "#,
203            )
204            .await
205            .context("failed to initialize Turso schema")?;
206        self.run_schema_migrations().await?;
207        self.mark_interrupted_workspace_task_runs().await?;
208        self.mark_interrupted_workspace_file_change_audits().await?;
209        Ok(())
210    }
211
212    async fn run_schema_migrations(&self) -> Result<()> {
213        let current_version = self.schema_user_version().await?;
214        if current_version > STORAGE_SCHEMA_VERSION {
215            bail!(
216                "database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
217            );
218        }
219
220        if current_version < 2 {
221            self.ensure_workspace_task_runs_error_code_column().await?;
222        }
223        if current_version < 3 {
224            self.ensure_workspace_file_change_audits_error_code_column()
225                .await?;
226        }
227
228        if current_version != STORAGE_SCHEMA_VERSION {
229            self.set_schema_user_version(STORAGE_SCHEMA_VERSION).await?;
230        }
231        Ok(())
232    }
233
234    async fn reject_newer_schema_version(&self) -> Result<()> {
235        let current_version = self.schema_user_version().await?;
236        if current_version > STORAGE_SCHEMA_VERSION {
237            bail!(
238                "database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
239            );
240        }
241        Ok(())
242    }
243
244    async fn schema_user_version(&self) -> Result<i64> {
245        let mut rows = self
246            .conn
247            .query("PRAGMA user_version", ())
248            .await
249            .context("failed to inspect database schema version")?;
250        let Some(row) = rows
251            .next()
252            .await
253            .context("failed to read database schema version row")?
254        else {
255            bail!("database schema version pragma returned no rows");
256        };
257        row.get(0)
258            .context("failed to decode database schema version")
259    }
260
261    async fn set_schema_user_version(&self, version: i64) -> Result<()> {
262        self.conn
263            .execute(&format!("PRAGMA user_version = {version}"), ())
264            .await
265            .with_context(|| format!("failed to set database schema version to {version}"))?;
266        Ok(())
267    }
268
269    pub async fn schema_status(&self) -> Result<StorageSchemaStatus> {
270        Ok(StorageSchemaStatus {
271            current_version: self.schema_user_version().await?,
272            expected_version: STORAGE_SCHEMA_VERSION,
273        })
274    }
275
276    async fn ensure_workspace_task_runs_error_code_column(&self) -> Result<()> {
277        let mut rows = self
278            .conn
279            .query("PRAGMA table_info(workspace_task_runs)", ())
280            .await
281            .context("failed to inspect workspace task run schema")?;
282        while let Some(row) = rows
283            .next()
284            .await
285            .context("failed to read workspace task run schema row")?
286        {
287            let name: String = row
288                .get(1)
289                .context("failed to decode workspace task run column name")?;
290            if name == "error_code" {
291                return Ok(());
292            }
293        }
294        drop(rows);
295
296        self.conn
297            .execute(
298                "ALTER TABLE workspace_task_runs ADD COLUMN error_code TEXT",
299                (),
300            )
301            .await
302            .context("failed to add workspace task run error_code column")?;
303        Ok(())
304    }
305
306    async fn ensure_workspace_file_change_audits_error_code_column(&self) -> Result<()> {
307        let mut rows = self
308            .conn
309            .query("PRAGMA table_info(workspace_file_change_audits)", ())
310            .await
311            .context("failed to inspect workspace file change audit schema")?;
312        while let Some(row) = rows
313            .next()
314            .await
315            .context("failed to read workspace file change audit schema row")?
316        {
317            let name: String = row
318                .get(1)
319                .context("failed to decode workspace file change audit column name")?;
320            if name == "error_code" {
321                return Ok(());
322            }
323        }
324        drop(rows);
325
326        self.conn
327            .execute(
328                "ALTER TABLE workspace_file_change_audits ADD COLUMN error_code TEXT",
329                (),
330            )
331            .await
332            .context("failed to add workspace file change audit error_code column")?;
333        Ok(())
334    }
335
336    async fn mark_interrupted_workspace_task_runs(&self) -> Result<()> {
337        let now_ms = unix_epoch_ms();
338        self.conn
339            .execute(
340                r#"
341                UPDATE workspace_task_runs
342                SET status = 'failed',
343                    completed_at_ms = COALESCE(completed_at_ms, ?1),
344                    duration_ms = COALESCE(duration_ms, MAX(0, ?1 - started_at_ms)),
345                    error = COALESCE(error, 'workspace task interrupted before completion'),
346                    error_code = COALESCE(error_code, 'workspace_task_interrupted'),
347                    updated_at = CURRENT_TIMESTAMP
348                WHERE status IN ('queued', 'running')
349                "#,
350                [u64_to_i64(now_ms, "workspace task interrupted_at_ms")?],
351            )
352            .await
353            .context("failed to mark interrupted workspace task runs")?;
354        Ok(())
355    }
356
357    async fn mark_interrupted_workspace_file_change_audits(&self) -> Result<()> {
358        let now_ms = unix_epoch_ms();
359        self.conn
360            .execute(
361                r#"
362                UPDATE workspace_file_change_audits
363                SET status = 'failed',
364                    applied_at_ms = COALESCE(applied_at_ms, ?1),
365                    error = COALESCE(error, 'workspace file change interrupted before completion'),
366                    error_code = COALESCE(error_code, 'workspace_file_change_interrupted'),
367                    updated_at = CURRENT_TIMESTAMP
368                WHERE status = 'applying'
369                "#,
370                [u64_to_i64(
371                    now_ms,
372                    "workspace file change interrupted_at_ms",
373                )?],
374            )
375            .await
376            .context("failed to mark interrupted workspace file change audits")?;
377        Ok(())
378    }
379
380    pub async fn list_source_roots(&self) -> Result<Vec<SourceRootSummary>> {
381        let mut rows = self
382            .conn
383            .query(
384                "SELECT id, path, read_only FROM source_roots ORDER BY path",
385                (),
386            )
387            .await
388            .context("failed to query source_roots")?;
389
390        let mut source_roots = Vec::new();
391        while let Some(row) = rows
392            .next()
393            .await
394            .context("failed to read source_root row")?
395        {
396            let id: String = row.get(0).context("failed to decode source_root id")?;
397            let path: String = row.get(1).context("failed to decode source_root path")?;
398            let read_only: i64 = row
399                .get(2)
400                .context("failed to decode source_root read_only")?;
401            source_roots.push(SourceRootSummary {
402                id: Uuid::parse_str(&id)
403                    .with_context(|| format!("invalid source_root UUID {id}"))?,
404                path,
405                read_only: read_only != 0,
406            });
407        }
408
409        Ok(source_roots)
410    }
411
412    pub async fn upsert_source_root(
413        &self,
414        summary: &SourceRootSummary,
415    ) -> Result<SourceRootSummary> {
416        self.conn
417            .execute(
418                "INSERT OR IGNORE INTO source_roots (path, id, read_only) VALUES (?1, ?2, ?3)",
419                (
420                    summary.path.clone(),
421                    summary.id.to_string(),
422                    if summary.read_only { 1_i64 } else { 0_i64 },
423                ),
424            )
425            .await
426            .with_context(|| format!("failed to persist source root {}", summary.path))?;
427
428        self.find_source_root(&summary.path)
429            .await?
430            .with_context(|| format!("persisted source root missing for {}", summary.path))
431    }
432
433    async fn find_source_root(&self, path: &str) -> Result<Option<SourceRootSummary>> {
434        let mut stmt = self
435            .conn
436            .prepare("SELECT id, path, read_only FROM source_roots WHERE path = ?1 LIMIT 1")
437            .await
438            .context("failed to prepare source_root lookup")?;
439        let mut rows = stmt
440            .query([path])
441            .await
442            .with_context(|| format!("failed to lookup source root {path}"))?;
443
444        let Some(row) = rows.next().await.context("failed to read lookup row")? else {
445            return Ok(None);
446        };
447
448        let id: String = row.get(0).context("failed to decode lookup id")?;
449        let stored_path: String = row.get(1).context("failed to decode lookup path")?;
450        let read_only: i64 = row.get(2).context("failed to decode lookup read_only")?;
451
452        Ok(Some(SourceRootSummary {
453            id: Uuid::parse_str(&id).with_context(|| format!("invalid persisted UUID {id}"))?,
454            path: stored_path,
455            read_only: read_only != 0,
456        }))
457    }
458
459    pub async fn record_provider_test(&self, provider: &str, ok: bool, detail: &str) -> Result<()> {
460        self.conn
461            .execute(
462                r#"
463                INSERT INTO provider_status (provider, last_test_ok, last_test_detail, last_tested_at)
464                VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP)
465                ON CONFLICT(provider) DO UPDATE SET
466                    last_test_ok = excluded.last_test_ok,
467                    last_test_detail = excluded.last_test_detail,
468                    last_tested_at = CURRENT_TIMESTAMP
469                "#,
470                (provider.to_string(), if ok { 1_i64 } else { 0_i64 }, detail.to_string()),
471            )
472            .await
473            .with_context(|| format!("failed to persist provider status for {provider}"))?;
474        Ok(())
475    }
476
477    pub async fn list_provider_statuses(&self) -> Result<Vec<ProviderStatusRow>> {
478        let mut rows = self
479            .conn
480            .query(
481                "SELECT provider, last_test_ok, last_test_detail, last_tested_at FROM provider_status",
482                (),
483            )
484            .await
485            .context("failed to query provider_status")?;
486
487        let mut statuses = Vec::new();
488        while let Some(row) = rows
489            .next()
490            .await
491            .context("failed to read provider_status row")?
492        {
493            let provider: String = row.get(0).context("failed to decode provider name")?;
494            let last_test_ok: Option<i64> = row.get(1).ok();
495            let last_test_detail: Option<String> = row.get(2).ok();
496            let last_tested_at: String =
497                row.get(3).context("failed to decode provider timestamp")?;
498            statuses.push(ProviderStatusRow {
499                provider,
500                last_test_ok: last_test_ok.map(|value| value != 0),
501                last_test_detail,
502                last_tested_at: Some(last_tested_at),
503            });
504        }
505
506        Ok(statuses)
507    }
508
509    pub async fn load_provider_selection(&self) -> Result<ProviderSelectionResponse> {
510        let mut rows = self
511            .conn
512            .query(
513                "SELECT provider, model_id FROM provider_selection WHERE singleton = 1",
514                (),
515            )
516            .await
517            .context("failed to query provider_selection")?;
518
519        let Some(row) = rows
520            .next()
521            .await
522            .context("failed to read provider_selection row")?
523        else {
524            return Ok(ProviderSelectionResponse {
525                selected_provider: None,
526                selected_model_id: None,
527            });
528        };
529
530        let selected_provider: Option<String> = row.get(0).ok();
531        let selected_model_id: Option<String> = row.get(1).ok();
532        Ok(ProviderSelectionResponse {
533            selected_provider,
534            selected_model_id,
535        })
536    }
537
538    pub async fn save_provider_selection(
539        &self,
540        selection: &ProviderSelectionResponse,
541    ) -> Result<ProviderSelectionResponse> {
542        self.conn
543            .execute(
544                r#"
545                INSERT INTO provider_selection (singleton, provider, model_id, updated_at)
546                VALUES (1, ?1, ?2, CURRENT_TIMESTAMP)
547                ON CONFLICT(singleton) DO UPDATE SET
548                    provider = excluded.provider,
549                    model_id = excluded.model_id,
550                    updated_at = CURRENT_TIMESTAMP
551                "#,
552                (
553                    selection.selected_provider.clone(),
554                    selection.selected_model_id.clone(),
555                ),
556            )
557            .await
558            .context("failed to persist provider selection")?;
559
560        self.load_provider_selection().await
561    }
562
563    pub async fn persist_session(&self, session_id: &str) -> Result<()> {
564        self.conn
565            .execute(
566                "INSERT OR IGNORE INTO sessions (id) VALUES (?1)",
567                [session_id],
568            )
569            .await
570            .with_context(|| format!("failed to persist session {session_id}"))?;
571        Ok(())
572    }
573
574    pub async fn session_exists(&self, session_id: &str) -> Result<bool> {
575        let mut stmt = self
576            .conn
577            .prepare("SELECT id FROM sessions WHERE id = ?1 LIMIT 1")
578            .await
579            .context("failed to prepare session lookup")?;
580        let mut rows = stmt
581            .query([session_id])
582            .await
583            .with_context(|| format!("failed to lookup session {session_id}"))?;
584
585        Ok(rows
586            .next()
587            .await
588            .context("failed to read session row")?
589            .is_some())
590    }
591
592    pub async fn upsert_workspace_task_run(
593        &self,
594        session_id: &str,
595        summary: &WorkspaceTaskRunSummary,
596    ) -> Result<()> {
597        self.conn
598            .execute(
599                r#"
600                INSERT INTO workspace_task_runs (
601                    run_id, session_id, task_id, path, status, command_label, exit_code,
602                    stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
603                    cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
604                    error_code, max_output_bytes, updated_at
605                )
606                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, CURRENT_TIMESTAMP)
607                ON CONFLICT(run_id) DO UPDATE SET
608                    task_id = excluded.task_id,
609                    path = excluded.path,
610                    status = excluded.status,
611                    command_label = excluded.command_label,
612                    exit_code = excluded.exit_code,
613                    stdout_tail = excluded.stdout_tail,
614                    stderr_tail = excluded.stderr_tail,
615                    stdout_truncated = excluded.stdout_truncated,
616                    stderr_truncated = excluded.stderr_truncated,
617                    timed_out = excluded.timed_out,
618                    cancel_requested = excluded.cancel_requested,
619                    started_at_ms = excluded.started_at_ms,
620                    completed_at_ms = excluded.completed_at_ms,
621                    duration_ms = excluded.duration_ms,
622                    error = excluded.error,
623                    error_code = excluded.error_code,
624                    max_output_bytes = excluded.max_output_bytes,
625                    updated_at = CURRENT_TIMESTAMP
626                "#,
627                turso::params![
628                    summary.run_id.to_string(),
629                    session_id.to_string(),
630                    workspace_task_id_label(summary.task_id).to_string(),
631                    summary.path.clone(),
632                    workspace_task_run_status_label(summary.status).to_string(),
633                    summary.command_label.clone(),
634                    summary.exit_code.map(i64::from),
635                    summary.stdout_tail.clone(),
636                    summary.stderr_tail.clone(),
637                    bool_to_i64(summary.stdout_truncated),
638                    bool_to_i64(summary.stderr_truncated),
639                    bool_to_i64(summary.timed_out),
640                    bool_to_i64(summary.cancel_requested),
641                    u64_to_i64(summary.started_at_ms, "workspace task started_at_ms")?,
642                    summary
643                        .completed_at_ms
644                        .map(|value| u64_to_i64(value, "workspace task completed_at_ms"))
645                        .transpose()?,
646                    summary
647                        .duration_ms
648                        .map(|value| u64_to_i64(value, "workspace task duration_ms"))
649                        .transpose()?,
650                    summary.error.clone(),
651                    summary.error_code.clone(),
652                    usize_to_i64(summary.max_output_bytes, "workspace task max_output_bytes")?,
653                ],
654            )
655            .await
656            .with_context(|| format!("failed to persist workspace task run {}", summary.run_id))?;
657        self.prune_workspace_task_run_history(session_id, WORKSPACE_HISTORY_RETAIN_PER_SESSION)
658            .await?;
659        Ok(())
660    }
661
662    pub async fn list_workspace_task_runs(
663        &self,
664        session_id: &str,
665        limit: usize,
666        error_code: Option<&str>,
667    ) -> Result<Vec<WorkspaceTaskRunSummary>> {
668        let limit = usize_to_i64(limit.clamp(1, 200), "workspace task run list limit")?;
669        let error_code = error_code.map(str::to_string);
670        let mut rows = self
671            .conn
672            .query(
673                r#"
674                SELECT run_id, task_id, path, status, command_label, exit_code,
675                    stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
676                    cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
677                    error_code, max_output_bytes
678                FROM workspace_task_runs
679                WHERE session_id = ?1
680                    AND (?2 IS NULL OR error_code = ?2)
681                ORDER BY started_at_ms DESC, run_id DESC
682                LIMIT ?3
683                "#,
684                (session_id.to_string(), error_code, limit),
685            )
686            .await
687            .context("failed to query workspace task runs")?;
688
689        let mut summaries = Vec::new();
690        while let Some(row) = rows
691            .next()
692            .await
693            .context("failed to read workspace task run row")?
694        {
695            summaries.push(decode_workspace_task_run_summary(&row)?);
696        }
697        Ok(summaries)
698    }
699
700    pub async fn get_workspace_task_run(
701        &self,
702        session_id: &str,
703        run_id: Uuid,
704    ) -> Result<Option<WorkspaceTaskRunSummary>> {
705        let mut stmt = self
706            .conn
707            .prepare(
708                r#"
709                SELECT run_id, task_id, path, status, command_label, exit_code,
710                    stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
711                    cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
712                    error_code, max_output_bytes
713                FROM workspace_task_runs
714                WHERE session_id = ?1 AND run_id = ?2
715                LIMIT 1
716                "#,
717            )
718            .await
719            .context("failed to prepare workspace task run lookup")?;
720        let mut rows = stmt
721            .query((session_id.to_string(), run_id.to_string()))
722            .await
723            .with_context(|| format!("failed to lookup workspace task run {run_id}"))?;
724
725        let Some(row) = rows
726            .next()
727            .await
728            .context("failed to read workspace task run lookup row")?
729        else {
730            return Ok(None);
731        };
732        Ok(Some(decode_workspace_task_run_summary(&row)?))
733    }
734
735    pub async fn start_workspace_file_change_audit(
736        &self,
737        session_id: &str,
738        request: &WorkspaceFileChangePreviewRequest,
739        size_bytes_before: Option<u64>,
740    ) -> Result<WorkspaceFileChangeAuditEntry> {
741        let audit_id = Uuid::new_v4();
742        let created_at_ms = unix_epoch_ms();
743        self.conn
744            .execute(
745                r#"
746                INSERT INTO workspace_file_change_audits (
747                    audit_id, session_id, action, path, target_path, status, error,
748                    error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms, updated_at
749                )
750                VALUES (?1, ?2, ?3, ?4, ?5, 'applying', NULL, NULL, ?6, NULL, ?7, NULL, CURRENT_TIMESTAMP)
751                "#,
752                turso::params![
753                    audit_id.to_string(),
754                    session_id.to_string(),
755                    workspace_file_change_action_label(request.action).to_string(),
756                    request.path.clone(),
757                    request.target_path.clone(),
758                    size_bytes_before
759                        .map(|value| u64_to_i64(value, "workspace file change size_bytes_before"))
760                        .transpose()?,
761                    u64_to_i64(created_at_ms, "workspace file change created_at_ms")?,
762                ],
763            )
764            .await
765            .with_context(|| {
766                format!(
767                    "failed to start workspace file change audit for {}",
768                    request.path
769                )
770            })?;
771        self.prune_workspace_file_change_audit_history(
772            session_id,
773            WORKSPACE_HISTORY_RETAIN_PER_SESSION,
774        )
775        .await?;
776
777        self.find_workspace_file_change_audit(session_id, audit_id)
778            .await?
779            .with_context(|| format!("created workspace file change audit missing for {audit_id}"))
780    }
781
782    pub async fn finish_workspace_file_change_audit(
783        &self,
784        session_id: &str,
785        audit_id: Uuid,
786        status: WorkspaceFileChangeAuditStatus,
787        error: Option<&str>,
788        error_code: Option<&str>,
789        size_bytes_after: Option<u64>,
790    ) -> Result<WorkspaceFileChangeAuditEntry> {
791        if matches!(status, WorkspaceFileChangeAuditStatus::Applying) {
792            anyhow::bail!("workspace file change audit finish status must be terminal");
793        }
794        let applied_at_ms = unix_epoch_ms();
795        self.conn
796            .execute(
797                r#"
798                UPDATE workspace_file_change_audits
799                SET status = ?3,
800                    error = ?4,
801                    error_code = ?5,
802                    size_bytes_after = ?6,
803                    applied_at_ms = ?7,
804                    updated_at = CURRENT_TIMESTAMP
805                WHERE session_id = ?1 AND audit_id = ?2
806                "#,
807                turso::params![
808                    session_id.to_string(),
809                    audit_id.to_string(),
810                    workspace_file_change_audit_status_label(status).to_string(),
811                    error.map(str::to_string),
812                    error_code.map(str::to_string),
813                    size_bytes_after
814                        .map(|value| u64_to_i64(value, "workspace file change size_bytes_after"))
815                        .transpose()?,
816                    u64_to_i64(applied_at_ms, "workspace file change applied_at_ms")?,
817                ],
818            )
819            .await
820            .with_context(|| format!("failed to finish workspace file change audit {audit_id}"))?;
821
822        self.find_workspace_file_change_audit(session_id, audit_id)
823            .await?
824            .with_context(|| format!("finished workspace file change audit missing for {audit_id}"))
825    }
826
827    pub async fn list_workspace_file_change_audits(
828        &self,
829        session_id: &str,
830        limit: usize,
831        error_code: Option<&str>,
832    ) -> Result<Vec<WorkspaceFileChangeAuditEntry>> {
833        let limit = usize_to_i64(
834            limit.clamp(1, 200),
835            "workspace file change audit list limit",
836        )?;
837        let error_code = error_code.map(str::to_string);
838        let mut rows = self
839            .conn
840            .query(
841                r#"
842                SELECT audit_id, action, path, target_path, status, error,
843                    error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
844                FROM workspace_file_change_audits
845                WHERE session_id = ?1
846                    AND (?2 IS NULL OR error_code = ?2)
847                ORDER BY created_at_ms DESC, audit_id DESC
848                LIMIT ?3
849                "#,
850                (session_id.to_string(), error_code, limit),
851            )
852            .await
853            .context("failed to query workspace file change audits")?;
854
855        let mut entries = Vec::new();
856        while let Some(row) = rows
857            .next()
858            .await
859            .context("failed to read workspace file change audit row")?
860        {
861            entries.push(decode_workspace_file_change_audit_entry(&row)?);
862        }
863        Ok(entries)
864    }
865
866    async fn find_workspace_file_change_audit(
867        &self,
868        session_id: &str,
869        audit_id: Uuid,
870    ) -> Result<Option<WorkspaceFileChangeAuditEntry>> {
871        let mut stmt = self
872            .conn
873            .prepare(
874                r#"
875                SELECT audit_id, action, path, target_path, status, error,
876                    error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
877                FROM workspace_file_change_audits
878                WHERE session_id = ?1 AND audit_id = ?2
879                LIMIT 1
880                "#,
881            )
882            .await
883            .context("failed to prepare workspace file change audit lookup")?;
884        let mut rows = stmt
885            .query((session_id.to_string(), audit_id.to_string()))
886            .await
887            .with_context(|| format!("failed to lookup workspace file change audit {audit_id}"))?;
888
889        let Some(row) = rows
890            .next()
891            .await
892            .context("failed to read workspace file change audit lookup row")?
893        else {
894            return Ok(None);
895        };
896        Ok(Some(decode_workspace_file_change_audit_entry(&row)?))
897    }
898
899    async fn prune_workspace_task_run_history(
900        &self,
901        session_id: &str,
902        retain_limit: usize,
903    ) -> Result<()> {
904        let retain_limit = usize_to_i64(retain_limit.max(1), "workspace task retention limit")?;
905        self.conn
906            .execute(
907                r#"
908                DELETE FROM workspace_task_runs
909                WHERE session_id = ?1
910                    AND run_id NOT IN (
911                        SELECT run_id
912                        FROM workspace_task_runs
913                        WHERE session_id = ?1
914                        ORDER BY started_at_ms DESC, run_id DESC
915                        LIMIT ?2
916                    )
917                "#,
918                (session_id.to_string(), retain_limit),
919            )
920            .await
921            .with_context(|| format!("failed to prune workspace task history for {session_id}"))?;
922        Ok(())
923    }
924
925    async fn prune_workspace_file_change_audit_history(
926        &self,
927        session_id: &str,
928        retain_limit: usize,
929    ) -> Result<()> {
930        let retain_limit =
931            usize_to_i64(retain_limit.max(1), "workspace file audit retention limit")?;
932        self.conn
933            .execute(
934                r#"
935                DELETE FROM workspace_file_change_audits
936                WHERE session_id = ?1
937                    AND audit_id NOT IN (
938                        SELECT audit_id
939                        FROM workspace_file_change_audits
940                        WHERE session_id = ?1
941                        ORDER BY created_at_ms DESC, audit_id DESC
942                        LIMIT ?2
943                    )
944                "#,
945                (session_id.to_string(), retain_limit),
946            )
947            .await
948            .with_context(|| {
949                format!("failed to prune workspace file change audit history for {session_id}")
950            })?;
951        Ok(())
952    }
953
954    pub async fn create_conversation(
955        &self,
956        session_id: &str,
957        title: Option<&str>,
958    ) -> Result<ConversationSummary> {
959        let conversation_id = Uuid::new_v4().to_string();
960        self.insert_conversation(session_id, &conversation_id, title)
961            .await?;
962        self.find_conversation(session_id, &conversation_id)
963            .await?
964            .with_context(|| format!("persisted conversation missing for {conversation_id}"))
965    }
966
967    pub async fn ensure_conversation(
968        &self,
969        session_id: &str,
970        conversation_id: &str,
971        title_hint: Option<&str>,
972    ) -> Result<ConversationSummary> {
973        self.insert_conversation(session_id, conversation_id, title_hint)
974            .await?;
975        self.find_conversation(session_id, conversation_id)
976            .await?
977            .with_context(|| format!("persisted conversation missing for {conversation_id}"))
978    }
979
980    pub async fn list_conversations(&self, session_id: &str) -> Result<Vec<ConversationSummary>> {
981        let mut rows = self
982            .conn
983            .query(
984                r#"
985                SELECT id, title, created_at, updated_at, last_message_at
986                FROM conversations
987                WHERE session_id = ?1
988                ORDER BY COALESCE(last_message_at, updated_at, created_at) DESC, id DESC
989                "#,
990                [session_id],
991            )
992            .await
993            .context("failed to query conversations")?;
994
995        let mut conversations = Vec::new();
996        while let Some(row) = rows
997            .next()
998            .await
999            .context("failed to read conversation row")?
1000        {
1001            conversations.push(decode_conversation_row(&row)?);
1002        }
1003
1004        Ok(conversations)
1005    }
1006
1007    pub async fn find_conversation(
1008        &self,
1009        session_id: &str,
1010        conversation_id: &str,
1011    ) -> Result<Option<ConversationSummary>> {
1012        let mut stmt = self
1013            .conn
1014            .prepare(
1015                r#"
1016                SELECT id, title, created_at, updated_at, last_message_at
1017                FROM conversations
1018                WHERE session_id = ?1 AND id = ?2
1019                LIMIT 1
1020                "#,
1021            )
1022            .await
1023            .context("failed to prepare conversation lookup")?;
1024        let mut rows = stmt
1025            .query((session_id.to_string(), conversation_id.to_string()))
1026            .await
1027            .with_context(|| format!("failed to lookup conversation {conversation_id}"))?;
1028
1029        let Some(row) = rows
1030            .next()
1031            .await
1032            .context("failed to read conversation lookup row")?
1033        else {
1034            return Ok(None);
1035        };
1036
1037        Ok(Some(decode_conversation_row(&row)?))
1038    }
1039
1040    pub async fn delete_conversation(
1041        &self,
1042        session_id: &str,
1043        conversation_id: &str,
1044    ) -> Result<bool> {
1045        if self
1046            .find_conversation(session_id, conversation_id)
1047            .await?
1048            .is_none()
1049        {
1050            return Ok(false);
1051        }
1052
1053        self.conn
1054            .execute(
1055                "DELETE FROM messages WHERE conversation_id = ?1",
1056                [conversation_id],
1057            )
1058            .await
1059            .with_context(|| {
1060                format!("failed to delete messages for conversation {conversation_id}")
1061            })?;
1062
1063        let deleted = self
1064            .conn
1065            .execute("DELETE FROM conversations WHERE id = ?1", [conversation_id])
1066            .await
1067            .with_context(|| format!("failed to delete conversation {conversation_id}"))?;
1068
1069        Ok(deleted > 0)
1070    }
1071
1072    pub async fn list_conversation_messages(
1073        &self,
1074        session_id: &str,
1075        conversation_id: &str,
1076    ) -> Result<Vec<ConversationMessage>> {
1077        if self
1078            .find_conversation(session_id, conversation_id)
1079            .await?
1080            .is_none()
1081        {
1082            return Ok(Vec::new());
1083        }
1084
1085        let mut stmt = self
1086            .conn
1087            .prepare(
1088                r#"
1089                SELECT id, conversation_id, role, content, status, created_at, provider, model_id
1090                FROM messages
1091                WHERE conversation_id = ?1
1092                ORDER BY sequence ASC, id ASC
1093                "#,
1094            )
1095            .await
1096            .context("failed to prepare message list query")?;
1097        let mut rows = stmt.query([conversation_id]).await.with_context(|| {
1098            format!("failed to list messages for conversation {conversation_id}")
1099        })?;
1100
1101        let mut messages = Vec::new();
1102        while let Some(row) = rows.next().await.context("failed to read message row")? {
1103            messages.push(decode_message_row(&row)?);
1104        }
1105
1106        Ok(messages)
1107    }
1108
1109    pub async fn create_message(
1110        &self,
1111        message: &NewConversationMessage,
1112    ) -> Result<ConversationMessage> {
1113        let message_id = Uuid::new_v4().to_string();
1114        let sequence = self.next_message_sequence(&message.conversation_id).await?;
1115        self.conn
1116            .execute(
1117                r#"
1118                INSERT INTO messages (id, conversation_id, sequence, role, content, status, provider, model_id)
1119                VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1120                "#,
1121                (
1122                    message_id.clone(),
1123                    message.conversation_id.clone(),
1124                    sequence,
1125                    message.role.clone(),
1126                    message.content.clone(),
1127                    message.status.clone(),
1128                    message.provider.clone(),
1129                    message.model_id.clone(),
1130                ),
1131            )
1132            .await
1133            .with_context(|| format!("failed to persist message for conversation {}", message.conversation_id))?;
1134
1135        self.touch_conversation_activity(
1136            &message.conversation_id,
1137            message.role.eq("user").then_some(message.content.as_str()),
1138        )
1139        .await?;
1140
1141        self.find_message(&message_id)
1142            .await?
1143            .with_context(|| format!("persisted message missing for {message_id}"))
1144    }
1145
1146    pub async fn update_message_content(
1147        &self,
1148        message_id: &str,
1149        content: &str,
1150        status: &str,
1151    ) -> Result<ConversationMessage> {
1152        let conversation_id = self
1153            .message_conversation_id(message_id)
1154            .await?
1155            .with_context(|| format!("message not found for update: {message_id}"))?;
1156
1157        self.conn
1158            .execute(
1159                "UPDATE messages SET content = ?2, status = ?3 WHERE id = ?1",
1160                (
1161                    message_id.to_string(),
1162                    content.to_string(),
1163                    status.to_string(),
1164                ),
1165            )
1166            .await
1167            .with_context(|| format!("failed to update message {message_id}"))?;
1168
1169        self.touch_conversation_activity(&conversation_id, None)
1170            .await?;
1171
1172        self.find_message(message_id)
1173            .await?
1174            .with_context(|| format!("updated message missing for {message_id}"))
1175    }
1176
1177    pub async fn append_message_delta(
1178        &self,
1179        message_id: &str,
1180        delta: &str,
1181    ) -> Result<ConversationMessage> {
1182        let conversation_id = self
1183            .message_conversation_id(message_id)
1184            .await?
1185            .with_context(|| format!("message not found for delta append: {message_id}"))?;
1186
1187        self.conn
1188            .execute(
1189                "UPDATE messages SET content = content || ?2, status = 'streaming' WHERE id = ?1",
1190                (message_id.to_string(), delta.to_string()),
1191            )
1192            .await
1193            .with_context(|| format!("failed to append delta to message {message_id}"))?;
1194
1195        self.touch_conversation_activity(&conversation_id, None)
1196            .await?;
1197
1198        self.find_message(message_id)
1199            .await?
1200            .with_context(|| format!("delta-updated message missing for {message_id}"))
1201    }
1202
1203    pub async fn replace_source_files(
1204        &self,
1205        source_root_id: &str,
1206        files: &[crate::ingest::ScannedSourceFile],
1207    ) -> Result<()> {
1208        self.conn
1209            .execute(
1210                "DELETE FROM source_files WHERE source_root_id = ?1",
1211                [source_root_id],
1212            )
1213            .await
1214            .with_context(|| format!("failed to clear source files for {source_root_id}"))?;
1215
1216        for file in files {
1217            self.conn
1218                .execute(
1219                    r#"
1220                    INSERT INTO source_files (
1221                        source_root_id, relative_path, absolute_path, fingerprint,
1222                        size_bytes, modified_at, file_type, status, last_error, updated_at
1223                    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, CURRENT_TIMESTAMP)
1224                    "#,
1225                    (
1226                        file.source_root_id.clone(),
1227                        file.relative_path.clone(),
1228                        file.absolute_path.clone(),
1229                        file.fingerprint.clone(),
1230                        file.size_bytes,
1231                        file.modified_at.clone(),
1232                        file.file_type.clone(),
1233                        file.status.clone(),
1234                        file.last_error.clone(),
1235                    ),
1236                )
1237                .await
1238                .with_context(|| format!("failed to persist source file {}", file.relative_path))?;
1239        }
1240
1241        Ok(())
1242    }
1243
1244    pub async fn create_ingest_job(
1245        &self,
1246        source_root_id: Option<&str>,
1247        scope_label: &str,
1248    ) -> Result<IngestJobSummary> {
1249        let job_id = Uuid::new_v4().to_string();
1250        self.conn
1251            .execute(
1252                r#"
1253                INSERT INTO ingest_jobs (id, source_root_id, scope_label, status, file_count, started_at)
1254                VALUES (?1, ?2, ?3, 'running', 0, CURRENT_TIMESTAMP)
1255                "#,
1256                (
1257                    job_id.clone(),
1258                    source_root_id.map(str::to_string),
1259                    scope_label.to_string(),
1260                ),
1261            )
1262            .await
1263            .with_context(|| format!("failed to create ingest job for {scope_label}"))?;
1264        self.find_ingest_job(&job_id)
1265            .await?
1266            .with_context(|| format!("created ingest job missing for {job_id}"))
1267    }
1268
1269    pub async fn complete_ingest_job(
1270        &self,
1271        job_id: &str,
1272        status: &str,
1273        file_count: usize,
1274        last_error: Option<&str>,
1275    ) -> Result<IngestJobSummary> {
1276        self.conn
1277            .execute(
1278                r#"
1279                UPDATE ingest_jobs
1280                SET status = ?2, file_count = ?3, completed_at = CURRENT_TIMESTAMP, last_error = ?4
1281                WHERE id = ?1
1282                "#,
1283                (
1284                    job_id.to_string(),
1285                    status.to_string(),
1286                    file_count as i64,
1287                    last_error.map(str::to_string),
1288                ),
1289            )
1290            .await
1291            .with_context(|| format!("failed to complete ingest job {job_id}"))?;
1292        self.find_ingest_job(job_id)
1293            .await?
1294            .with_context(|| format!("completed ingest job missing for {job_id}"))
1295    }
1296
1297    pub async fn list_ingest_jobs(&self) -> Result<Vec<IngestJobSummary>> {
1298        let mut rows = self
1299            .conn
1300            .query(
1301                "SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs ORDER BY started_at DESC LIMIT 20",
1302                (),
1303            )
1304            .await
1305            .context("failed to query ingest jobs")?;
1306
1307        let mut jobs = Vec::new();
1308        while let Some(row) = rows.next().await.context("failed to read ingest job row")? {
1309            jobs.push(decode_ingest_job_row(&row)?);
1310        }
1311
1312        Ok(jobs)
1313    }
1314
1315    pub async fn load_ingest_status(&self) -> Result<IngestStatusResponse> {
1316        let jobs = self.list_ingest_jobs().await?;
1317        let running = jobs.iter().any(|job| job.status == "running");
1318        let total_source_files = self.count_source_files().await?;
1319        let indexed_text_files = self.count_indexed_text_files().await?;
1320        Ok(IngestStatusResponse {
1321            running,
1322            total_source_files,
1323            indexed_text_files,
1324            jobs,
1325        })
1326    }
1327
1328    async fn find_ingest_job(&self, job_id: &str) -> Result<Option<IngestJobSummary>> {
1329        let mut stmt = self
1330            .conn
1331            .prepare(
1332                "SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs WHERE id = ?1 LIMIT 1",
1333            )
1334            .await
1335            .context("failed to prepare ingest job lookup")?;
1336        let mut rows = stmt
1337            .query([job_id])
1338            .await
1339            .with_context(|| format!("failed to lookup ingest job {job_id}"))?;
1340
1341        let Some(row) = rows
1342            .next()
1343            .await
1344            .context("failed to read ingest job lookup row")?
1345        else {
1346            return Ok(None);
1347        };
1348
1349        Ok(Some(decode_ingest_job_row(&row)?))
1350    }
1351
1352    async fn count_source_files(&self) -> Result<usize> {
1353        let mut rows = self
1354            .conn
1355            .query("SELECT COUNT(*) FROM source_files", ())
1356            .await
1357            .context("failed to count source files")?;
1358        let Some(row) = rows
1359            .next()
1360            .await
1361            .context("failed to read source file count")?
1362        else {
1363            return Ok(0);
1364        };
1365        let count: i64 = row.get(0).context("failed to decode source file count")?;
1366        Ok(count as usize)
1367    }
1368
1369    async fn count_indexed_text_files(&self) -> Result<usize> {
1370        let mut rows = self
1371            .conn
1372            .query(
1373                "SELECT COUNT(*) FROM source_files WHERE status = 'indexed'",
1374                (),
1375            )
1376            .await
1377            .context("failed to count indexed text files")?;
1378        let Some(row) = rows
1379            .next()
1380            .await
1381            .context("failed to read indexed text file count")?
1382        else {
1383            return Ok(0);
1384        };
1385        let count: i64 = row
1386            .get(0)
1387            .context("failed to decode indexed text file count")?;
1388        Ok(count as usize)
1389    }
1390
1391    pub async fn list_indexed_source_files(&self) -> Result<Vec<IndexedSourceFileRow>> {
1392        let mut rows = self
1393            .conn
1394            .query(
1395                "SELECT source_root_id, relative_path, absolute_path FROM source_files WHERE status = 'indexed' ORDER BY source_root_id, relative_path",
1396                (),
1397            )
1398            .await
1399            .context("failed to query indexed source files")?;
1400
1401        let mut files = Vec::new();
1402        while let Some(row) = rows
1403            .next()
1404            .await
1405            .context("failed to read indexed source file row")?
1406        {
1407            files.push(IndexedSourceFileRow {
1408                source_root_id: row
1409                    .get(0)
1410                    .context("failed to decode indexed source root id")?,
1411                relative_path: row
1412                    .get(1)
1413                    .context("failed to decode indexed source relative path")?,
1414                absolute_path: row
1415                    .get(2)
1416                    .context("failed to decode indexed source absolute path")?,
1417            });
1418        }
1419
1420        Ok(files)
1421    }
1422
1423    async fn insert_conversation(
1424        &self,
1425        session_id: &str,
1426        conversation_id: &str,
1427        title: Option<&str>,
1428    ) -> Result<()> {
1429        let normalized_title = normalize_conversation_title(title);
1430        self.conn
1431            .execute(
1432                r#"
1433                INSERT OR IGNORE INTO conversations (id, session_id, title, created_at, updated_at, last_message_at)
1434                VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL)
1435                "#,
1436                (
1437                    conversation_id.to_string(),
1438                    session_id.to_string(),
1439                    normalized_title.clone(),
1440                ),
1441            )
1442            .await
1443            .with_context(|| format!("failed to create conversation {conversation_id}"))?;
1444
1445        if self
1446            .find_conversation(session_id, conversation_id)
1447            .await?
1448            .is_none()
1449        {
1450            anyhow::bail!("conversation belongs to a different session");
1451        }
1452
1453        self.conn
1454            .execute(
1455                r#"
1456                UPDATE conversations
1457                SET title = CASE
1458                    WHEN title = ?2 AND ?3 IS NOT NULL THEN ?3
1459                    ELSE title
1460                END
1461                WHERE id = ?1
1462                "#,
1463                (
1464                    conversation_id.to_string(),
1465                    DEFAULT_CONVERSATION_TITLE.to_string(),
1466                    title.map(conversation_title_from_message),
1467                ),
1468            )
1469            .await
1470            .with_context(|| {
1471                format!("failed to normalize title for conversation {conversation_id}")
1472            })?;
1473
1474        Ok(())
1475    }
1476
1477    async fn next_message_sequence(&self, conversation_id: &str) -> Result<i64> {
1478        let mut stmt = self
1479            .conn
1480            .prepare(
1481                "SELECT COALESCE(MAX(sequence), 0) + 1 FROM messages WHERE conversation_id = ?1",
1482            )
1483            .await
1484            .context("failed to prepare message sequence lookup")?;
1485        let mut rows = stmt.query([conversation_id]).await.with_context(|| {
1486            format!("failed to query next message sequence for {conversation_id}")
1487        })?;
1488
1489        let Some(row) = rows
1490            .next()
1491            .await
1492            .context("failed to read message sequence row")?
1493        else {
1494            return Ok(1);
1495        };
1496
1497        let sequence: i64 = row.get(0).context("failed to decode message sequence")?;
1498        Ok(sequence)
1499    }
1500
1501    async fn touch_conversation_activity(
1502        &self,
1503        conversation_id: &str,
1504        title_hint: Option<&str>,
1505    ) -> Result<()> {
1506        self.conn
1507            .execute(
1508                r#"
1509                UPDATE conversations
1510                SET title = CASE
1511                    WHEN ?2 IS NOT NULL AND title = ?3 THEN ?2
1512                    ELSE title
1513                END,
1514                updated_at = CURRENT_TIMESTAMP,
1515                last_message_at = CURRENT_TIMESTAMP
1516                WHERE id = ?1
1517                "#,
1518                (
1519                    conversation_id.to_string(),
1520                    title_hint.map(conversation_title_from_message),
1521                    DEFAULT_CONVERSATION_TITLE.to_string(),
1522                ),
1523            )
1524            .await
1525            .with_context(|| format!("failed to touch conversation {conversation_id}"))?;
1526        Ok(())
1527    }
1528
1529    async fn find_message(&self, message_id: &str) -> Result<Option<ConversationMessage>> {
1530        let mut stmt = self
1531            .conn
1532            .prepare(
1533                r#"
1534                SELECT id, conversation_id, role, content, status, created_at, provider, model_id
1535                FROM messages
1536                WHERE id = ?1
1537                LIMIT 1
1538                "#,
1539            )
1540            .await
1541            .context("failed to prepare message lookup")?;
1542        let mut rows = stmt
1543            .query([message_id])
1544            .await
1545            .with_context(|| format!("failed to lookup message {message_id}"))?;
1546
1547        let Some(row) = rows
1548            .next()
1549            .await
1550            .context("failed to read message lookup row")?
1551        else {
1552            return Ok(None);
1553        };
1554
1555        Ok(Some(decode_message_row(&row)?))
1556    }
1557
1558    async fn message_conversation_id(&self, message_id: &str) -> Result<Option<String>> {
1559        let mut stmt = self
1560            .conn
1561            .prepare("SELECT conversation_id FROM messages WHERE id = ?1 LIMIT 1")
1562            .await
1563            .context("failed to prepare message conversation lookup")?;
1564        let mut rows = stmt
1565            .query([message_id])
1566            .await
1567            .with_context(|| format!("failed to lookup message conversation for {message_id}"))?;
1568
1569        let Some(row) = rows
1570            .next()
1571            .await
1572            .context("failed to read message conversation row")?
1573        else {
1574            return Ok(None);
1575        };
1576
1577        let conversation_id: String = row
1578            .get(0)
1579            .context("failed to decode message conversation id")?;
1580        Ok(Some(conversation_id))
1581    }
1582}
1583
1584#[derive(Debug, Clone)]
1585pub struct ProviderStatusRow {
1586    pub provider: String,
1587    pub last_test_ok: Option<bool>,
1588    pub last_test_detail: Option<String>,
1589    pub last_tested_at: Option<String>,
1590}
1591
1592fn decode_ingest_job_row(row: &Row) -> Result<IngestJobSummary> {
1593    Ok(IngestJobSummary {
1594        id: row.get(0).context("failed to decode ingest job id")?,
1595        source_root_id: row.get(1).ok(),
1596        scope_label: row.get(2).context("failed to decode ingest scope label")?,
1597        status: row.get(3).context("failed to decode ingest status")?,
1598        file_count: row
1599            .get::<i64>(4)
1600            .context("failed to decode ingest file count")? as usize,
1601        started_at: row.get(5).context("failed to decode ingest started_at")?,
1602        completed_at: row.get(6).ok(),
1603        last_error: row.get(7).ok(),
1604    })
1605}
1606
1607fn decode_conversation_row(row: &Row) -> Result<ConversationSummary> {
1608    Ok(ConversationSummary {
1609        id: row.get(0).context("failed to decode conversation id")?,
1610        title: row.get(1).context("failed to decode conversation title")?,
1611        created_at: row
1612            .get(2)
1613            .context("failed to decode conversation created_at")?,
1614        updated_at: row
1615            .get(3)
1616            .context("failed to decode conversation updated_at")?,
1617        last_message_at: row.get(4).ok(),
1618    })
1619}
1620
1621fn decode_message_row(row: &Row) -> Result<ConversationMessage> {
1622    Ok(ConversationMessage {
1623        id: row.get(0).context("failed to decode message id")?,
1624        conversation_id: row
1625            .get(1)
1626            .context("failed to decode message conversation_id")?,
1627        role: row.get(2).context("failed to decode message role")?,
1628        content: row.get(3).context("failed to decode message content")?,
1629        status: row.get(4).context("failed to decode message status")?,
1630        created_at: row.get(5).context("failed to decode message created_at")?,
1631        provider: row.get(6).ok(),
1632        model_id: row.get(7).ok(),
1633    })
1634}
1635
1636fn decode_workspace_task_run_summary(row: &Row) -> Result<WorkspaceTaskRunSummary> {
1637    let run_id: String = row
1638        .get(0)
1639        .context("failed to decode workspace task run id")?;
1640    let task_id: String = row
1641        .get(1)
1642        .context("failed to decode workspace task run task_id")?;
1643    let status: String = row
1644        .get(3)
1645        .context("failed to decode workspace task run status")?;
1646    let exit_code: Option<i64> = row.get(5).ok();
1647    let stdout_truncated: i64 = row
1648        .get(8)
1649        .context("failed to decode workspace task stdout_truncated")?;
1650    let stderr_truncated: i64 = row
1651        .get(9)
1652        .context("failed to decode workspace task stderr_truncated")?;
1653    let timed_out: i64 = row
1654        .get(10)
1655        .context("failed to decode workspace task timed_out")?;
1656    let cancel_requested: i64 = row
1657        .get(11)
1658        .context("failed to decode workspace task cancel_requested")?;
1659    let started_at_ms: i64 = row
1660        .get(12)
1661        .context("failed to decode workspace task started_at_ms")?;
1662    let completed_at_ms: Option<i64> = row.get(13).ok();
1663    let duration_ms: Option<i64> = row.get(14).ok();
1664    let max_output_bytes: i64 = row
1665        .get(17)
1666        .context("failed to decode workspace task max_output_bytes")?;
1667
1668    Ok(WorkspaceTaskRunSummary {
1669        run_id: Uuid::parse_str(&run_id).with_context(|| format!("invalid run UUID {run_id}"))?,
1670        task_id: parse_workspace_task_id(&task_id)?,
1671        path: row.get(2).context("failed to decode workspace task path")?,
1672        status: parse_workspace_task_run_status(&status)?,
1673        command_label: row
1674            .get(4)
1675            .context("failed to decode workspace task command_label")?,
1676        exit_code: exit_code
1677            .map(|value| {
1678                i32::try_from(value)
1679                    .with_context(|| format!("workspace task exit_code out of range: {value}"))
1680            })
1681            .transpose()?,
1682        stdout_tail: row
1683            .get(6)
1684            .context("failed to decode workspace task stdout_tail")?,
1685        stderr_tail: row
1686            .get(7)
1687            .context("failed to decode workspace task stderr_tail")?,
1688        stdout_truncated: stdout_truncated != 0,
1689        stderr_truncated: stderr_truncated != 0,
1690        timed_out: timed_out != 0,
1691        cancel_requested: cancel_requested != 0,
1692        started_at_ms: i64_to_u64(started_at_ms, "workspace task started_at_ms")?,
1693        completed_at_ms: completed_at_ms
1694            .map(|value| i64_to_u64(value, "workspace task completed_at_ms"))
1695            .transpose()?,
1696        duration_ms: duration_ms
1697            .map(|value| i64_to_u64(value, "workspace task duration_ms"))
1698            .transpose()?,
1699        error: row.get(15).ok(),
1700        error_code: row.get(16).ok(),
1701        max_output_bytes: usize::try_from(max_output_bytes).with_context(|| {
1702            format!("workspace task max_output_bytes out of range: {max_output_bytes}")
1703        })?,
1704    })
1705}
1706
1707fn decode_workspace_file_change_audit_entry(row: &Row) -> Result<WorkspaceFileChangeAuditEntry> {
1708    let audit_id: String = row
1709        .get(0)
1710        .context("failed to decode workspace file change audit id")?;
1711    let action: String = row
1712        .get(1)
1713        .context("failed to decode workspace file change action")?;
1714    let status: String = row
1715        .get(4)
1716        .context("failed to decode workspace file change status")?;
1717    let size_bytes_before: Option<i64> = row.get(7).ok();
1718    let size_bytes_after: Option<i64> = row.get(8).ok();
1719    let created_at_ms: i64 = row
1720        .get(9)
1721        .context("failed to decode workspace file change created_at_ms")?;
1722    let applied_at_ms: Option<i64> = row.get(10).ok();
1723
1724    Ok(WorkspaceFileChangeAuditEntry {
1725        audit_id: Uuid::parse_str(&audit_id)
1726            .with_context(|| format!("invalid workspace file change audit UUID {audit_id}"))?,
1727        action: parse_workspace_file_change_action(&action)?,
1728        path: row
1729            .get(2)
1730            .context("failed to decode workspace file change path")?,
1731        target_path: row.get(3).ok(),
1732        status: parse_workspace_file_change_audit_status(&status)?,
1733        error: row.get(5).ok(),
1734        error_code: row.get(6).ok(),
1735        size_bytes_before: size_bytes_before
1736            .map(|value| i64_to_u64(value, "workspace file change size_bytes_before"))
1737            .transpose()?,
1738        size_bytes_after: size_bytes_after
1739            .map(|value| i64_to_u64(value, "workspace file change size_bytes_after"))
1740            .transpose()?,
1741        created_at_ms: i64_to_u64(created_at_ms, "workspace file change created_at_ms")?,
1742        applied_at_ms: applied_at_ms
1743            .map(|value| i64_to_u64(value, "workspace file change applied_at_ms"))
1744            .transpose()?,
1745    })
1746}
1747
1748fn normalize_conversation_title(title: Option<&str>) -> String {
1749    title
1750        .map(str::trim)
1751        .filter(|value| !value.is_empty())
1752        .map(conversation_title_from_message)
1753        .unwrap_or_else(|| DEFAULT_CONVERSATION_TITLE.to_string())
1754}
1755
1756fn conversation_title_from_message(message: &str) -> String {
1757    let trimmed = message.trim();
1758    if trimmed.is_empty() {
1759        return DEFAULT_CONVERSATION_TITLE.to_string();
1760    }
1761
1762    let mut title = String::new();
1763    for ch in trimmed.chars().take(CONVERSATION_TITLE_LIMIT) {
1764        title.push(ch);
1765    }
1766
1767    if trimmed.chars().count() > CONVERSATION_TITLE_LIMIT {
1768        title.push_str("...");
1769    }
1770
1771    title
1772}
1773
1774fn workspace_task_id_label(task_id: WorkspaceTaskId) -> &'static str {
1775    match task_id {
1776        WorkspaceTaskId::GitStatus => "git_status",
1777        WorkspaceTaskId::GitDiff => "git_diff",
1778        WorkspaceTaskId::CargoCheck => "cargo_check",
1779        WorkspaceTaskId::NpmCheck => "npm_check",
1780    }
1781}
1782
1783fn parse_workspace_task_id(value: &str) -> Result<WorkspaceTaskId> {
1784    match value {
1785        "git_status" => Ok(WorkspaceTaskId::GitStatus),
1786        "git_diff" => Ok(WorkspaceTaskId::GitDiff),
1787        "cargo_check" => Ok(WorkspaceTaskId::CargoCheck),
1788        "npm_check" => Ok(WorkspaceTaskId::NpmCheck),
1789        _ => Err(anyhow::anyhow!("unsupported workspace task id: {value}")),
1790    }
1791}
1792
1793fn workspace_task_run_status_label(status: WorkspaceTaskRunStatus) -> &'static str {
1794    match status {
1795        WorkspaceTaskRunStatus::Queued => "queued",
1796        WorkspaceTaskRunStatus::Running => "running",
1797        WorkspaceTaskRunStatus::Complete => "complete",
1798        WorkspaceTaskRunStatus::Failed => "failed",
1799        WorkspaceTaskRunStatus::Cancelled => "cancelled",
1800        WorkspaceTaskRunStatus::TimedOut => "timed_out",
1801    }
1802}
1803
1804fn parse_workspace_task_run_status(value: &str) -> Result<WorkspaceTaskRunStatus> {
1805    match value {
1806        "queued" => Ok(WorkspaceTaskRunStatus::Queued),
1807        "running" => Ok(WorkspaceTaskRunStatus::Running),
1808        "complete" => Ok(WorkspaceTaskRunStatus::Complete),
1809        "failed" => Ok(WorkspaceTaskRunStatus::Failed),
1810        "cancelled" => Ok(WorkspaceTaskRunStatus::Cancelled),
1811        "timed_out" => Ok(WorkspaceTaskRunStatus::TimedOut),
1812        _ => Err(anyhow::anyhow!(
1813            "unsupported workspace task run status: {value}"
1814        )),
1815    }
1816}
1817
1818fn workspace_file_change_action_label(action: WorkspaceFileChangeAction) -> &'static str {
1819    match action {
1820        WorkspaceFileChangeAction::WriteText => "write_text",
1821        WorkspaceFileChangeAction::DeleteFile => "delete_file",
1822        WorkspaceFileChangeAction::RenamePath => "rename_path",
1823    }
1824}
1825
1826fn parse_workspace_file_change_action(value: &str) -> Result<WorkspaceFileChangeAction> {
1827    match value {
1828        "write_text" => Ok(WorkspaceFileChangeAction::WriteText),
1829        "delete_file" => Ok(WorkspaceFileChangeAction::DeleteFile),
1830        "rename_path" => Ok(WorkspaceFileChangeAction::RenamePath),
1831        _ => Err(anyhow::anyhow!(
1832            "unsupported workspace file change action: {value}"
1833        )),
1834    }
1835}
1836
1837fn workspace_file_change_audit_status_label(
1838    status: WorkspaceFileChangeAuditStatus,
1839) -> &'static str {
1840    match status {
1841        WorkspaceFileChangeAuditStatus::Applying => "applying",
1842        WorkspaceFileChangeAuditStatus::Complete => "complete",
1843        WorkspaceFileChangeAuditStatus::Failed => "failed",
1844    }
1845}
1846
1847fn parse_workspace_file_change_audit_status(value: &str) -> Result<WorkspaceFileChangeAuditStatus> {
1848    match value {
1849        "applying" => Ok(WorkspaceFileChangeAuditStatus::Applying),
1850        "complete" => Ok(WorkspaceFileChangeAuditStatus::Complete),
1851        "failed" => Ok(WorkspaceFileChangeAuditStatus::Failed),
1852        _ => Err(anyhow::anyhow!(
1853            "unsupported workspace file change audit status: {value}"
1854        )),
1855    }
1856}
1857
1858fn bool_to_i64(value: bool) -> i64 {
1859    if value { 1 } else { 0 }
1860}
1861
1862fn u64_to_i64(value: u64, label: &str) -> Result<i64> {
1863    i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1864}
1865
1866fn usize_to_i64(value: usize, label: &str) -> Result<i64> {
1867    i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1868}
1869
1870fn i64_to_u64(value: i64, label: &str) -> Result<u64> {
1871    u64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
1872}
1873
1874fn unix_epoch_ms() -> u64 {
1875    SystemTime::now()
1876        .duration_since(UNIX_EPOCH)
1877        .map(|duration| duration.as_millis() as u64)
1878        .unwrap_or(0)
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883    use std::path::PathBuf;
1884
1885    use soma_studio_core::{
1886        AppConfig, ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
1887        WorkspaceFileChangeAuditStatus, WorkspaceFileChangePreviewRequest, WorkspaceTaskId,
1888        WorkspaceTaskRunStatus, WorkspaceTaskRunSummary,
1889    };
1890    use turso::Builder;
1891    use uuid::Uuid;
1892
1893    use super::{
1894        DEFAULT_CONVERSATION_TITLE, NewConversationMessage, STORAGE_SCHEMA_VERSION, StudioStorage,
1895        WORKSPACE_HISTORY_RETAIN_PER_SESSION,
1896    };
1897
1898    #[tokio::test]
1899    async fn source_root_roundtrip_persists() {
1900        let temp_dir = std::env::temp_dir().join(format!("soma-studio-storage-{}", Uuid::new_v4()));
1901        std::fs::create_dir_all(&temp_dir).expect("temp dir");
1902
1903        let config = test_config(&temp_dir);
1904        let storage = StudioStorage::open(&config).await.expect("storage");
1905        let source_root = SourceRootSummary {
1906            id: Uuid::new_v4(),
1907            path: "F:/docs".to_string(),
1908            read_only: true,
1909        };
1910
1911        let first = storage
1912            .upsert_source_root(&source_root)
1913            .await
1914            .expect("insert");
1915        let second = storage
1916            .upsert_source_root(&source_root)
1917            .await
1918            .expect("duplicate");
1919        let listed = storage.list_source_roots().await.expect("list");
1920
1921        assert_eq!(first.id, second.id);
1922        assert_eq!(listed.len(), 1);
1923        assert_eq!(listed[0].path, "F:/docs");
1924
1925        let _ = std::fs::remove_dir_all(temp_dir);
1926    }
1927
1928    #[tokio::test]
1929    async fn provider_selection_roundtrip_persists() {
1930        let temp_dir =
1931            std::env::temp_dir().join(format!("soma-studio-selection-{}", Uuid::new_v4()));
1932        std::fs::create_dir_all(&temp_dir).expect("temp dir");
1933
1934        let config = test_config(&temp_dir);
1935        let storage = StudioStorage::open(&config).await.expect("storage");
1936        let saved = storage
1937            .save_provider_selection(&ProviderSelectionResponse {
1938                selected_provider: Some("ollama".to_string()),
1939                selected_model_id: Some("qwen3:8b".to_string()),
1940            })
1941            .await
1942            .expect("save selection");
1943        let loaded = storage
1944            .load_provider_selection()
1945            .await
1946            .expect("load selection");
1947
1948        assert_eq!(saved.selected_provider.as_deref(), Some("ollama"));
1949        assert_eq!(loaded.selected_model_id.as_deref(), Some("qwen3:8b"));
1950
1951        let cleared = storage
1952            .save_provider_selection(&ProviderSelectionResponse {
1953                selected_provider: None,
1954                selected_model_id: None,
1955            })
1956            .await
1957            .expect("clear selection");
1958        assert!(cleared.selected_provider.is_none());
1959        assert!(cleared.selected_model_id.is_none());
1960
1961        let _ = std::fs::remove_dir_all(temp_dir);
1962    }
1963
1964    #[tokio::test]
1965    async fn session_roundtrip_persists() {
1966        let temp_dir = std::env::temp_dir().join(format!("soma-studio-session-{}", Uuid::new_v4()));
1967        std::fs::create_dir_all(&temp_dir).expect("temp dir");
1968
1969        let config = test_config(&temp_dir);
1970        let storage = StudioStorage::open(&config).await.expect("storage");
1971        let session_id = Uuid::new_v4().to_string();
1972
1973        assert!(
1974            !storage
1975                .session_exists(&session_id)
1976                .await
1977                .expect("session missing")
1978        );
1979        storage
1980            .persist_session(&session_id)
1981            .await
1982            .expect("persist session");
1983        assert!(
1984            storage
1985                .session_exists(&session_id)
1986                .await
1987                .expect("session exists")
1988        );
1989
1990        let _ = std::fs::remove_dir_all(temp_dir);
1991    }
1992
1993    #[tokio::test]
1994    async fn workspace_task_run_history_roundtrip_persists() {
1995        let temp_dir =
1996            std::env::temp_dir().join(format!("soma-studio-task-runs-{}", Uuid::new_v4()));
1997        std::fs::create_dir_all(&temp_dir).expect("temp dir");
1998
1999        let config = test_config(&temp_dir);
2000        let storage = StudioStorage::open(&config).await.expect("storage");
2001        let session_id = Uuid::new_v4().to_string();
2002        let run_id = Uuid::new_v4();
2003        let mut summary = WorkspaceTaskRunSummary {
2004            run_id,
2005            task_id: WorkspaceTaskId::GitStatus,
2006            path: ".".to_string(),
2007            status: WorkspaceTaskRunStatus::Running,
2008            command_label: "git status --short --branch".to_string(),
2009            exit_code: None,
2010            stdout_tail: String::new(),
2011            stderr_tail: String::new(),
2012            stdout_truncated: false,
2013            stderr_truncated: false,
2014            timed_out: false,
2015            cancel_requested: false,
2016            started_at_ms: 1_000,
2017            completed_at_ms: None,
2018            duration_ms: None,
2019            error: None,
2020            error_code: None,
2021            max_output_bytes: 64 * 1024,
2022        };
2023
2024        storage
2025            .upsert_workspace_task_run(&session_id, &summary)
2026            .await
2027            .expect("persist running task");
2028        summary.status = WorkspaceTaskRunStatus::Complete;
2029        summary.exit_code = Some(0);
2030        summary.stdout_tail = "## main\n".to_string();
2031        summary.completed_at_ms = Some(1_050);
2032        summary.duration_ms = Some(50);
2033        summary.error_code = None;
2034        storage
2035            .upsert_workspace_task_run(&session_id, &summary)
2036            .await
2037            .expect("persist completed task");
2038
2039        let listed = storage
2040            .list_workspace_task_runs(&session_id, 10, None)
2041            .await
2042            .expect("list task runs");
2043        let loaded = storage
2044            .get_workspace_task_run(&session_id, run_id)
2045            .await
2046            .expect("load task run")
2047            .expect("task run exists");
2048
2049        assert_eq!(listed.len(), 1);
2050        assert_eq!(loaded.run_id, run_id);
2051        assert_eq!(loaded.status, WorkspaceTaskRunStatus::Complete);
2052        assert_eq!(loaded.stdout_tail, "## main\n");
2053        assert_eq!(loaded.duration_ms, Some(50));
2054        assert!(
2055            storage
2056                .list_workspace_task_runs("other-session", 10, None)
2057                .await
2058                .expect("other session task runs")
2059                .is_empty()
2060        );
2061
2062        let interrupted_id = Uuid::new_v4();
2063        let interrupted = WorkspaceTaskRunSummary {
2064            run_id: interrupted_id,
2065            task_id: WorkspaceTaskId::CargoCheck,
2066            path: ".".to_string(),
2067            status: WorkspaceTaskRunStatus::Running,
2068            command_label: "cargo check --workspace".to_string(),
2069            exit_code: None,
2070            stdout_tail: String::new(),
2071            stderr_tail: String::new(),
2072            stdout_truncated: false,
2073            stderr_truncated: false,
2074            timed_out: false,
2075            cancel_requested: false,
2076            started_at_ms: 1_100,
2077            completed_at_ms: None,
2078            duration_ms: None,
2079            error: None,
2080            error_code: None,
2081            max_output_bytes: 64 * 1024,
2082        };
2083        storage
2084            .upsert_workspace_task_run(&session_id, &interrupted)
2085            .await
2086            .expect("persist interrupted candidate");
2087        drop(storage);
2088
2089        let reopened = StudioStorage::open(&config)
2090            .await
2091            .expect("reopened storage");
2092        let interrupted_loaded = reopened
2093            .get_workspace_task_run(&session_id, interrupted_id)
2094            .await
2095            .expect("load interrupted")
2096            .expect("interrupted task exists");
2097
2098        assert_eq!(interrupted_loaded.status, WorkspaceTaskRunStatus::Failed);
2099        assert_eq!(
2100            interrupted_loaded.error.as_deref(),
2101            Some("workspace task interrupted before completion")
2102        );
2103        assert_eq!(
2104            interrupted_loaded.error_code.as_deref(),
2105            Some("workspace_task_interrupted")
2106        );
2107        assert!(interrupted_loaded.completed_at_ms.is_some());
2108        let filtered_interrupted = reopened
2109            .list_workspace_task_runs(&session_id, 10, Some("workspace_task_interrupted"))
2110            .await
2111            .expect("list interrupted task runs");
2112        assert_eq!(filtered_interrupted.len(), 1);
2113        assert_eq!(filtered_interrupted[0].run_id, interrupted_id);
2114        assert!(
2115            reopened
2116                .list_workspace_task_runs(&session_id, 10, Some("workspace_task_failed_exit"))
2117                .await
2118                .expect("list failed exit task runs")
2119                .is_empty()
2120        );
2121
2122        let _ = std::fs::remove_dir_all(temp_dir);
2123    }
2124
2125    #[tokio::test]
2126    async fn workspace_file_change_audit_roundtrip_persists_without_content() {
2127        let temp_dir =
2128            std::env::temp_dir().join(format!("soma-studio-file-audit-{}", Uuid::new_v4()));
2129        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2130
2131        let config = test_config(&temp_dir);
2132        let storage = StudioStorage::open(&config).await.expect("storage");
2133        let session_id = Uuid::new_v4().to_string();
2134        let request = WorkspaceFileChangePreviewRequest {
2135            action: WorkspaceFileChangeAction::WriteText,
2136            path: "docs/new.md".to_string(),
2137            target_path: None,
2138            content: Some("do not persist this content".to_string()),
2139            expected_modified_at_ms: None,
2140        };
2141
2142        let started = storage
2143            .start_workspace_file_change_audit(&session_id, &request, None)
2144            .await
2145            .expect("start file audit");
2146        assert_eq!(started.status, WorkspaceFileChangeAuditStatus::Applying);
2147        assert_eq!(started.path, "docs/new.md");
2148        assert_eq!(started.size_bytes_before, None);
2149
2150        let completed = storage
2151            .finish_workspace_file_change_audit(
2152                &session_id,
2153                started.audit_id,
2154                WorkspaceFileChangeAuditStatus::Complete,
2155                None,
2156                None,
2157                Some(12),
2158            )
2159            .await
2160            .expect("finish file audit");
2161        let listed = storage
2162            .list_workspace_file_change_audits(&session_id, 10, None)
2163            .await
2164            .expect("list file audits");
2165
2166        assert_eq!(completed.status, WorkspaceFileChangeAuditStatus::Complete);
2167        assert_eq!(completed.size_bytes_after, Some(12));
2168        assert_eq!(listed.len(), 1);
2169        assert_eq!(listed[0].audit_id, started.audit_id);
2170        assert_eq!(listed[0].error, None);
2171        assert_eq!(listed[0].error_code, None);
2172        assert!(
2173            storage
2174                .list_workspace_file_change_audits("other-session", 10, None)
2175                .await
2176                .expect("other session file audits")
2177                .is_empty()
2178        );
2179
2180        let interrupted_request = WorkspaceFileChangePreviewRequest {
2181            action: WorkspaceFileChangeAction::RenamePath,
2182            path: "docs/new.md".to_string(),
2183            target_path: Some("docs/renamed.md".to_string()),
2184            content: None,
2185            expected_modified_at_ms: Some(1_000),
2186        };
2187        let interrupted = storage
2188            .start_workspace_file_change_audit(&session_id, &interrupted_request, Some(12))
2189            .await
2190            .expect("start interrupted file audit");
2191        drop(storage);
2192
2193        let reopened = StudioStorage::open(&config)
2194            .await
2195            .expect("reopened storage");
2196        let reopened_audits = reopened
2197            .list_workspace_file_change_audits(&session_id, 10, None)
2198            .await
2199            .expect("reopened file audits");
2200        let interrupted_loaded = reopened_audits
2201            .iter()
2202            .find(|audit| audit.audit_id == interrupted.audit_id)
2203            .expect("interrupted audit exists");
2204
2205        assert_eq!(
2206            interrupted_loaded.status,
2207            WorkspaceFileChangeAuditStatus::Failed
2208        );
2209        assert_eq!(
2210            interrupted_loaded.error.as_deref(),
2211            Some("workspace file change interrupted before completion")
2212        );
2213        assert_eq!(
2214            interrupted_loaded.error_code.as_deref(),
2215            Some("workspace_file_change_interrupted")
2216        );
2217        assert!(interrupted_loaded.applied_at_ms.is_some());
2218        let filtered_interrupted = reopened
2219            .list_workspace_file_change_audits(
2220                &session_id,
2221                10,
2222                Some("workspace_file_change_interrupted"),
2223            )
2224            .await
2225            .expect("list interrupted file audits");
2226        assert_eq!(filtered_interrupted.len(), 1);
2227        assert_eq!(filtered_interrupted[0].audit_id, interrupted.audit_id);
2228        assert!(
2229            reopened
2230                .list_workspace_file_change_audits(&session_id, 10, Some("workspace_conflict"))
2231                .await
2232                .expect("list conflict file audits")
2233                .is_empty()
2234        );
2235
2236        let _ = std::fs::remove_dir_all(temp_dir);
2237    }
2238
2239    #[tokio::test]
2240    async fn workspace_task_run_schema_adds_error_code_to_existing_db() {
2241        let temp_dir =
2242            std::env::temp_dir().join(format!("soma-studio-task-run-migration-{}", Uuid::new_v4()));
2243        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2244
2245        let config = test_config(&temp_dir);
2246        let db_path = config.db_path.to_str().expect("db path");
2247        let db = Builder::new_local(db_path).build().await.expect("db");
2248        let conn = db.connect().expect("connect");
2249        conn.execute_batch(
2250            r#"
2251            CREATE TABLE workspace_task_runs (
2252                run_id TEXT PRIMARY KEY,
2253                session_id TEXT NOT NULL,
2254                task_id TEXT NOT NULL,
2255                path TEXT NOT NULL,
2256                status TEXT NOT NULL,
2257                command_label TEXT NOT NULL,
2258                exit_code INTEGER,
2259                stdout_tail TEXT NOT NULL,
2260                stderr_tail TEXT NOT NULL,
2261                stdout_truncated INTEGER NOT NULL,
2262                stderr_truncated INTEGER NOT NULL,
2263                timed_out INTEGER NOT NULL,
2264                cancel_requested INTEGER NOT NULL,
2265                started_at_ms INTEGER NOT NULL,
2266                completed_at_ms INTEGER,
2267                duration_ms INTEGER,
2268                error TEXT,
2269                max_output_bytes INTEGER NOT NULL,
2270                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
2271            );
2272            "#,
2273        )
2274        .await
2275        .expect("old workspace task schema");
2276        drop(conn);
2277        drop(db);
2278
2279        let storage = StudioStorage::open(&config)
2280            .await
2281            .expect("migrated storage");
2282        let schema_status = storage.schema_status().await.expect("schema status");
2283        assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
2284        assert!(schema_status.is_current());
2285
2286        let session_id = Uuid::new_v4().to_string();
2287        let run_id = Uuid::new_v4();
2288        storage
2289            .upsert_workspace_task_run(
2290                &session_id,
2291                &WorkspaceTaskRunSummary {
2292                    run_id,
2293                    task_id: WorkspaceTaskId::GitStatus,
2294                    path: ".".to_string(),
2295                    status: WorkspaceTaskRunStatus::Failed,
2296                    command_label: "git status --short --branch".to_string(),
2297                    exit_code: Some(1),
2298                    stdout_tail: String::new(),
2299                    stderr_tail: "failed".to_string(),
2300                    stdout_truncated: false,
2301                    stderr_truncated: false,
2302                    timed_out: false,
2303                    cancel_requested: false,
2304                    started_at_ms: 1_000,
2305                    completed_at_ms: Some(1_010),
2306                    duration_ms: Some(10),
2307                    error: Some("workspace task exited with code 1".to_string()),
2308                    error_code: Some("workspace_task_failed_exit".to_string()),
2309                    max_output_bytes: 64 * 1024,
2310                },
2311            )
2312            .await
2313            .expect("persist migrated task run");
2314
2315        let loaded = storage
2316            .get_workspace_task_run(&session_id, run_id)
2317            .await
2318            .expect("load migrated task run")
2319            .expect("migrated task run exists");
2320        assert_eq!(
2321            loaded.error_code.as_deref(),
2322            Some("workspace_task_failed_exit")
2323        );
2324
2325        let _ = std::fs::remove_dir_all(temp_dir);
2326    }
2327
2328    #[tokio::test]
2329    async fn workspace_file_change_audit_schema_adds_error_code_to_existing_db() {
2330        let temp_dir = std::env::temp_dir().join(format!(
2331            "soma-studio-file-audit-migration-{}",
2332            Uuid::new_v4()
2333        ));
2334        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2335
2336        let config = test_config(&temp_dir);
2337        let db_path = config.db_path.to_str().expect("db path");
2338        let db = Builder::new_local(db_path).build().await.expect("db");
2339        let conn = db.connect().expect("connect");
2340        conn.execute_batch(
2341            r#"
2342            CREATE TABLE workspace_file_change_audits (
2343                audit_id TEXT PRIMARY KEY,
2344                session_id TEXT NOT NULL,
2345                action TEXT NOT NULL,
2346                path TEXT NOT NULL,
2347                target_path TEXT,
2348                status TEXT NOT NULL,
2349                error TEXT,
2350                size_bytes_before INTEGER,
2351                size_bytes_after INTEGER,
2352                created_at_ms INTEGER NOT NULL,
2353                applied_at_ms INTEGER,
2354                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
2355            );
2356            "#,
2357        )
2358        .await
2359        .expect("old workspace file audit schema");
2360        drop(conn);
2361        drop(db);
2362
2363        let storage = StudioStorage::open(&config)
2364            .await
2365            .expect("migrated storage");
2366        let schema_status = storage.schema_status().await.expect("schema status");
2367        assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
2368        assert!(schema_status.is_current());
2369
2370        let session_id = Uuid::new_v4().to_string();
2371        let request = WorkspaceFileChangePreviewRequest {
2372            action: WorkspaceFileChangeAction::RenamePath,
2373            path: "docs/new.md".to_string(),
2374            target_path: Some("docs/renamed.md".to_string()),
2375            content: None,
2376            expected_modified_at_ms: Some(1_000),
2377        };
2378        let started = storage
2379            .start_workspace_file_change_audit(&session_id, &request, Some(10))
2380            .await
2381            .expect("start migrated file audit");
2382        let failed = storage
2383            .finish_workspace_file_change_audit(
2384                &session_id,
2385                started.audit_id,
2386                WorkspaceFileChangeAuditStatus::Failed,
2387                Some("workspace rename target already exists"),
2388                Some("workspace_conflict"),
2389                Some(10),
2390            )
2391            .await
2392            .expect("finish migrated file audit");
2393
2394        assert_eq!(failed.error_code.as_deref(), Some("workspace_conflict"));
2395
2396        let _ = std::fs::remove_dir_all(temp_dir);
2397    }
2398
2399    #[tokio::test]
2400    async fn storage_rejects_newer_schema_version() {
2401        let temp_dir =
2402            std::env::temp_dir().join(format!("soma-studio-newer-schema-{}", Uuid::new_v4()));
2403        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2404
2405        let config = test_config(&temp_dir);
2406        let db_path = config.db_path.to_str().expect("db path");
2407        let db = Builder::new_local(db_path).build().await.expect("db");
2408        let conn = db.connect().expect("connect");
2409        conn.execute(
2410            &format!("PRAGMA user_version = {}", STORAGE_SCHEMA_VERSION + 1),
2411            (),
2412        )
2413        .await
2414        .expect("set newer schema version");
2415        drop(conn);
2416        drop(db);
2417
2418        let error = StudioStorage::open(&config)
2419            .await
2420            .expect_err("newer schema must be rejected");
2421        assert!(
2422            error.to_string().contains("newer than supported version"),
2423            "{error}"
2424        );
2425
2426        let db = Builder::new_local(db_path)
2427            .build()
2428            .await
2429            .expect("db reopen");
2430        let conn = db.connect().expect("connect reopened db");
2431        let mut rows = conn
2432            .query(
2433                "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'source_roots'",
2434                (),
2435            )
2436            .await
2437            .expect("query table count");
2438        let row = rows
2439            .next()
2440            .await
2441            .expect("read table count")
2442            .expect("table count row");
2443        let table_count: i64 = row.get(0).expect("decode table count");
2444        assert_eq!(table_count, 0);
2445        drop(rows);
2446        drop(conn);
2447        drop(db);
2448
2449        let _ = std::fs::remove_dir_all(temp_dir);
2450    }
2451
2452    #[tokio::test]
2453    async fn workspace_history_retention_limits_rows_per_session() {
2454        let temp_dir =
2455            std::env::temp_dir().join(format!("soma-studio-history-retention-{}", Uuid::new_v4()));
2456        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2457
2458        let config = test_config(&temp_dir);
2459        let storage = StudioStorage::open(&config).await.expect("storage");
2460        let session_id = Uuid::new_v4().to_string();
2461        let request = WorkspaceFileChangePreviewRequest {
2462            action: WorkspaceFileChangeAction::DeleteFile,
2463            path: "docs/old.md".to_string(),
2464            target_path: None,
2465            content: None,
2466            expected_modified_at_ms: Some(1_000),
2467        };
2468
2469        for index in 0..205 {
2470            storage
2471                .upsert_workspace_task_run(
2472                    &session_id,
2473                    &WorkspaceTaskRunSummary {
2474                        run_id: Uuid::new_v4(),
2475                        task_id: WorkspaceTaskId::GitStatus,
2476                        path: ".".to_string(),
2477                        status: WorkspaceTaskRunStatus::Complete,
2478                        command_label: "git status --short --branch".to_string(),
2479                        exit_code: Some(0),
2480                        stdout_tail: String::new(),
2481                        stderr_tail: String::new(),
2482                        stdout_truncated: false,
2483                        stderr_truncated: false,
2484                        timed_out: false,
2485                        cancel_requested: false,
2486                        started_at_ms: index,
2487                        completed_at_ms: Some(index + 1),
2488                        duration_ms: Some(1),
2489                        error: None,
2490                        error_code: None,
2491                        max_output_bytes: 64 * 1024,
2492                    },
2493                )
2494                .await
2495                .expect("persist retained task run");
2496            storage
2497                .start_workspace_file_change_audit(&session_id, &request, Some(index))
2498                .await
2499                .expect("persist retained file audit");
2500        }
2501
2502        let task_run_count = count_rows(
2503            &storage,
2504            "SELECT COUNT(*) FROM workspace_task_runs WHERE session_id = ?1",
2505            &session_id,
2506        )
2507        .await;
2508        let file_audit_count = count_rows(
2509            &storage,
2510            "SELECT COUNT(*) FROM workspace_file_change_audits WHERE session_id = ?1",
2511            &session_id,
2512        )
2513        .await;
2514
2515        assert_eq!(task_run_count, WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64);
2516        assert_eq!(
2517            file_audit_count,
2518            WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64
2519        );
2520
2521        let _ = std::fs::remove_dir_all(temp_dir);
2522    }
2523
2524    #[tokio::test]
2525    async fn conversation_and_message_roundtrip_persists() {
2526        let temp_dir =
2527            std::env::temp_dir().join(format!("soma-studio-conversation-{}", Uuid::new_v4()));
2528        std::fs::create_dir_all(&temp_dir).expect("temp dir");
2529
2530        let config = test_config(&temp_dir);
2531        let storage = StudioStorage::open(&config).await.expect("storage");
2532
2533        let conversation = storage
2534            .create_conversation("session-a", None)
2535            .await
2536            .expect("conversation");
2537        assert_eq!(conversation.title, DEFAULT_CONVERSATION_TITLE);
2538
2539        let user_message = storage
2540            .create_message(&NewConversationMessage {
2541                conversation_id: conversation.id.clone(),
2542                role: "user".to_string(),
2543                content: "Discuss the ingest milestone split".to_string(),
2544                status: "complete".to_string(),
2545                provider: None,
2546                model_id: None,
2547            })
2548            .await
2549            .expect("user message");
2550        let assistant_message = storage
2551            .create_message(&NewConversationMessage {
2552                conversation_id: conversation.id.clone(),
2553                role: "assistant".to_string(),
2554                content: String::new(),
2555                status: "streaming".to_string(),
2556                provider: Some("ollama".to_string()),
2557                model_id: Some("qwen3:8b".to_string()),
2558            })
2559            .await
2560            .expect("assistant placeholder");
2561        let partial_assistant = storage
2562            .append_message_delta(&assistant_message.id, "Start with conversation ")
2563            .await
2564            .expect("assistant delta");
2565        let completed_assistant = storage
2566            .update_message_content(
2567                &assistant_message.id,
2568                "Start with conversation persistence and restore APIs.",
2569                "complete",
2570            )
2571            .await
2572            .expect("assistant completion");
2573
2574        let conversations = storage
2575            .list_conversations("session-a")
2576            .await
2577            .expect("conversations");
2578        let messages = storage
2579            .list_conversation_messages("session-a", &conversation.id)
2580            .await
2581            .expect("messages");
2582
2583        assert_eq!(conversations.len(), 1);
2584        assert_eq!(conversations[0].id, conversation.id);
2585        assert_eq!(conversations[0].title, "Discuss the ingest milestone split");
2586        assert_eq!(messages.len(), 2);
2587        assert_eq!(messages[0].id, user_message.id);
2588        assert_eq!(messages[1].id, assistant_message.id);
2589        assert_eq!(partial_assistant.content, "Start with conversation ");
2590        assert_eq!(partial_assistant.status, "streaming");
2591        assert_eq!(completed_assistant.status, "complete");
2592        assert_eq!(completed_assistant.provider.as_deref(), Some("ollama"));
2593        assert!(
2594            storage
2595                .list_conversations("session-b")
2596                .await
2597                .expect("session-b conversations")
2598                .is_empty()
2599        );
2600        assert!(
2601            storage
2602                .list_conversation_messages("session-b", &conversation.id)
2603                .await
2604                .expect("session-b messages")
2605                .is_empty()
2606        );
2607
2608        let deleted = storage
2609            .delete_conversation("session-a", &conversation.id)
2610            .await
2611            .expect("delete conversation");
2612        let messages_after_delete = storage
2613            .list_conversation_messages("session-a", &conversation.id)
2614            .await
2615            .expect("messages after delete");
2616
2617        assert!(deleted);
2618        assert!(messages_after_delete.is_empty());
2619        assert!(
2620            storage
2621                .list_conversations("session-a")
2622                .await
2623                .expect("conversations after delete")
2624                .is_empty()
2625        );
2626
2627        let _ = std::fs::remove_dir_all(temp_dir);
2628    }
2629
2630    fn test_config(temp_dir: &std::path::Path) -> AppConfig {
2631        AppConfig {
2632            app_name: "Soma Studio".to_string(),
2633            bind_addr: "127.0.0.1:0".to_string(),
2634            project_root: temp_dir.to_path_buf(),
2635            data_dir: temp_dir.to_path_buf(),
2636            derived_dir: temp_dir.join("derived"),
2637            notebook_dir: temp_dir.join("notebook"),
2638            user_assets_dir: temp_dir.join("assets"),
2639            db_path: temp_dir.join("test.db"),
2640            web_build_dir: PathBuf::from("unused"),
2641            web_shell_file: PathBuf::from("unused/spa.html"),
2642        }
2643    }
2644
2645    async fn count_rows(storage: &StudioStorage, sql: &str, session_id: &str) -> i64 {
2646        let mut rows = storage
2647            .conn
2648            .query(sql, [session_id])
2649            .await
2650            .expect("count query");
2651        let row = rows
2652            .next()
2653            .await
2654            .expect("count row read")
2655            .expect("count row");
2656        row.get(0).expect("count value")
2657    }
2658}