use anyhow::{Context, Result, bail};
use soma_studio_core::{
AppConfig, ConversationMessage, ConversationSummary, IngestJobSummary, IngestStatusResponse,
ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
WorkspaceFileChangeAuditEntry, WorkspaceFileChangeAuditStatus,
WorkspaceFileChangePreviewRequest, WorkspaceTaskId, WorkspaceTaskRunStatus,
WorkspaceTaskRunSummary,
};
use std::time::{SystemTime, UNIX_EPOCH};
use turso::{Builder, Connection, Row};
use uuid::Uuid;
const DEFAULT_CONVERSATION_TITLE: &str = "New conversation";
const CONVERSATION_TITLE_LIMIT: usize = 60;
const WORKSPACE_HISTORY_RETAIN_PER_SESSION: usize = 200;
pub const STORAGE_SCHEMA_VERSION: i64 = 3;
#[derive(Debug, Clone)]
pub struct StudioStorage {
conn: Connection,
}
#[derive(Debug, Clone)]
pub struct NewConversationMessage {
pub conversation_id: String,
pub role: String,
pub content: String,
pub status: String,
pub provider: Option<String>,
pub model_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct IndexedSourceFileRow {
pub source_root_id: String,
pub relative_path: String,
pub absolute_path: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageSchemaStatus {
pub current_version: i64,
pub expected_version: i64,
}
impl StorageSchemaStatus {
pub fn is_current(&self) -> bool {
self.current_version == self.expected_version
}
}
impl StudioStorage {
pub async fn open(config: &AppConfig) -> Result<Self> {
let db_path = config
.db_path
.to_str()
.with_context(|| format!("invalid db path {}", config.db_path.display()))?;
let db = Builder::new_local(db_path).build().await.with_context(|| {
format!(
"failed to open local Turso db at {}",
config.db_path.display()
)
})?;
let conn = db
.connect()
.context("failed to connect to local Turso db")?;
let storage = Self { conn };
storage.reject_newer_schema_version().await?;
storage.initialize_schema().await?;
Ok(storage)
}
async fn initialize_schema(&self) -> Result<()> {
self.conn
.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS source_roots (
path TEXT PRIMARY KEY,
id TEXT NOT NULL,
read_only INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS provider_status (
provider TEXT PRIMARY KEY,
last_test_ok INTEGER,
last_test_detail TEXT,
last_tested_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS provider_selection (
singleton INTEGER PRIMARY KEY CHECK (singleton = 1),
provider TEXT,
model_id TEXT,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
title TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_message_at TEXT
);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
provider TEXT,
model_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_conversations_recent
ON conversations (session_id, last_message_at DESC, updated_at DESC, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_conversation_created
ON messages (conversation_id, sequence);
CREATE TABLE IF NOT EXISTS source_files (
source_root_id TEXT NOT NULL,
relative_path TEXT NOT NULL,
absolute_path TEXT NOT NULL,
fingerprint TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
modified_at TEXT NOT NULL,
file_type TEXT NOT NULL,
status TEXT NOT NULL,
last_error TEXT,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (source_root_id, relative_path)
);
CREATE TABLE IF NOT EXISTS ingest_jobs (
id TEXT PRIMARY KEY,
source_root_id TEXT,
scope_label TEXT NOT NULL,
status TEXT NOT NULL,
file_count INTEGER NOT NULL DEFAULT 0,
started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT,
last_error TEXT
);
CREATE INDEX IF NOT EXISTS idx_ingest_jobs_started_at
ON ingest_jobs (started_at DESC);
CREATE TABLE IF NOT EXISTS workspace_task_runs (
run_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
task_id TEXT NOT NULL,
path TEXT NOT NULL,
status TEXT NOT NULL,
command_label TEXT NOT NULL,
exit_code INTEGER,
stdout_tail TEXT NOT NULL,
stderr_tail TEXT NOT NULL,
stdout_truncated INTEGER NOT NULL,
stderr_truncated INTEGER NOT NULL,
timed_out INTEGER NOT NULL,
cancel_requested INTEGER NOT NULL,
started_at_ms INTEGER NOT NULL,
completed_at_ms INTEGER,
duration_ms INTEGER,
error TEXT,
error_code TEXT,
max_output_bytes INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_workspace_task_runs_recent
ON workspace_task_runs (session_id, started_at_ms DESC);
CREATE TABLE IF NOT EXISTS workspace_file_change_audits (
audit_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
action TEXT NOT NULL,
path TEXT NOT NULL,
target_path TEXT,
status TEXT NOT NULL,
error TEXT,
error_code TEXT,
size_bytes_before INTEGER,
size_bytes_after INTEGER,
created_at_ms INTEGER NOT NULL,
applied_at_ms INTEGER,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_workspace_file_change_audits_recent
ON workspace_file_change_audits (session_id, created_at_ms DESC);
"#,
)
.await
.context("failed to initialize Turso schema")?;
self.run_schema_migrations().await?;
self.mark_interrupted_workspace_task_runs().await?;
self.mark_interrupted_workspace_file_change_audits().await?;
Ok(())
}
async fn run_schema_migrations(&self) -> Result<()> {
let current_version = self.schema_user_version().await?;
if current_version > STORAGE_SCHEMA_VERSION {
bail!(
"database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
);
}
if current_version < 2 {
self.ensure_workspace_task_runs_error_code_column().await?;
}
if current_version < 3 {
self.ensure_workspace_file_change_audits_error_code_column()
.await?;
}
if current_version != STORAGE_SCHEMA_VERSION {
self.set_schema_user_version(STORAGE_SCHEMA_VERSION).await?;
}
Ok(())
}
async fn reject_newer_schema_version(&self) -> Result<()> {
let current_version = self.schema_user_version().await?;
if current_version > STORAGE_SCHEMA_VERSION {
bail!(
"database schema version {current_version} is newer than supported version {STORAGE_SCHEMA_VERSION}"
);
}
Ok(())
}
async fn schema_user_version(&self) -> Result<i64> {
let mut rows = self
.conn
.query("PRAGMA user_version", ())
.await
.context("failed to inspect database schema version")?;
let Some(row) = rows
.next()
.await
.context("failed to read database schema version row")?
else {
bail!("database schema version pragma returned no rows");
};
row.get(0)
.context("failed to decode database schema version")
}
async fn set_schema_user_version(&self, version: i64) -> Result<()> {
self.conn
.execute(&format!("PRAGMA user_version = {version}"), ())
.await
.with_context(|| format!("failed to set database schema version to {version}"))?;
Ok(())
}
pub async fn schema_status(&self) -> Result<StorageSchemaStatus> {
Ok(StorageSchemaStatus {
current_version: self.schema_user_version().await?,
expected_version: STORAGE_SCHEMA_VERSION,
})
}
async fn ensure_workspace_task_runs_error_code_column(&self) -> Result<()> {
let mut rows = self
.conn
.query("PRAGMA table_info(workspace_task_runs)", ())
.await
.context("failed to inspect workspace task run schema")?;
while let Some(row) = rows
.next()
.await
.context("failed to read workspace task run schema row")?
{
let name: String = row
.get(1)
.context("failed to decode workspace task run column name")?;
if name == "error_code" {
return Ok(());
}
}
drop(rows);
self.conn
.execute(
"ALTER TABLE workspace_task_runs ADD COLUMN error_code TEXT",
(),
)
.await
.context("failed to add workspace task run error_code column")?;
Ok(())
}
async fn ensure_workspace_file_change_audits_error_code_column(&self) -> Result<()> {
let mut rows = self
.conn
.query("PRAGMA table_info(workspace_file_change_audits)", ())
.await
.context("failed to inspect workspace file change audit schema")?;
while let Some(row) = rows
.next()
.await
.context("failed to read workspace file change audit schema row")?
{
let name: String = row
.get(1)
.context("failed to decode workspace file change audit column name")?;
if name == "error_code" {
return Ok(());
}
}
drop(rows);
self.conn
.execute(
"ALTER TABLE workspace_file_change_audits ADD COLUMN error_code TEXT",
(),
)
.await
.context("failed to add workspace file change audit error_code column")?;
Ok(())
}
async fn mark_interrupted_workspace_task_runs(&self) -> Result<()> {
let now_ms = unix_epoch_ms();
self.conn
.execute(
r#"
UPDATE workspace_task_runs
SET status = 'failed',
completed_at_ms = COALESCE(completed_at_ms, ?1),
duration_ms = COALESCE(duration_ms, MAX(0, ?1 - started_at_ms)),
error = COALESCE(error, 'workspace task interrupted before completion'),
error_code = COALESCE(error_code, 'workspace_task_interrupted'),
updated_at = CURRENT_TIMESTAMP
WHERE status IN ('queued', 'running')
"#,
[u64_to_i64(now_ms, "workspace task interrupted_at_ms")?],
)
.await
.context("failed to mark interrupted workspace task runs")?;
Ok(())
}
async fn mark_interrupted_workspace_file_change_audits(&self) -> Result<()> {
let now_ms = unix_epoch_ms();
self.conn
.execute(
r#"
UPDATE workspace_file_change_audits
SET status = 'failed',
applied_at_ms = COALESCE(applied_at_ms, ?1),
error = COALESCE(error, 'workspace file change interrupted before completion'),
error_code = COALESCE(error_code, 'workspace_file_change_interrupted'),
updated_at = CURRENT_TIMESTAMP
WHERE status = 'applying'
"#,
[u64_to_i64(
now_ms,
"workspace file change interrupted_at_ms",
)?],
)
.await
.context("failed to mark interrupted workspace file change audits")?;
Ok(())
}
pub async fn list_source_roots(&self) -> Result<Vec<SourceRootSummary>> {
let mut rows = self
.conn
.query(
"SELECT id, path, read_only FROM source_roots ORDER BY path",
(),
)
.await
.context("failed to query source_roots")?;
let mut source_roots = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read source_root row")?
{
let id: String = row.get(0).context("failed to decode source_root id")?;
let path: String = row.get(1).context("failed to decode source_root path")?;
let read_only: i64 = row
.get(2)
.context("failed to decode source_root read_only")?;
source_roots.push(SourceRootSummary {
id: Uuid::parse_str(&id)
.with_context(|| format!("invalid source_root UUID {id}"))?,
path,
read_only: read_only != 0,
});
}
Ok(source_roots)
}
pub async fn upsert_source_root(
&self,
summary: &SourceRootSummary,
) -> Result<SourceRootSummary> {
self.conn
.execute(
"INSERT OR IGNORE INTO source_roots (path, id, read_only) VALUES (?1, ?2, ?3)",
(
summary.path.clone(),
summary.id.to_string(),
if summary.read_only { 1_i64 } else { 0_i64 },
),
)
.await
.with_context(|| format!("failed to persist source root {}", summary.path))?;
self.find_source_root(&summary.path)
.await?
.with_context(|| format!("persisted source root missing for {}", summary.path))
}
async fn find_source_root(&self, path: &str) -> Result<Option<SourceRootSummary>> {
let mut stmt = self
.conn
.prepare("SELECT id, path, read_only FROM source_roots WHERE path = ?1 LIMIT 1")
.await
.context("failed to prepare source_root lookup")?;
let mut rows = stmt
.query([path])
.await
.with_context(|| format!("failed to lookup source root {path}"))?;
let Some(row) = rows.next().await.context("failed to read lookup row")? else {
return Ok(None);
};
let id: String = row.get(0).context("failed to decode lookup id")?;
let stored_path: String = row.get(1).context("failed to decode lookup path")?;
let read_only: i64 = row.get(2).context("failed to decode lookup read_only")?;
Ok(Some(SourceRootSummary {
id: Uuid::parse_str(&id).with_context(|| format!("invalid persisted UUID {id}"))?,
path: stored_path,
read_only: read_only != 0,
}))
}
pub async fn record_provider_test(&self, provider: &str, ok: bool, detail: &str) -> Result<()> {
self.conn
.execute(
r#"
INSERT INTO provider_status (provider, last_test_ok, last_test_detail, last_tested_at)
VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP)
ON CONFLICT(provider) DO UPDATE SET
last_test_ok = excluded.last_test_ok,
last_test_detail = excluded.last_test_detail,
last_tested_at = CURRENT_TIMESTAMP
"#,
(provider.to_string(), if ok { 1_i64 } else { 0_i64 }, detail.to_string()),
)
.await
.with_context(|| format!("failed to persist provider status for {provider}"))?;
Ok(())
}
pub async fn list_provider_statuses(&self) -> Result<Vec<ProviderStatusRow>> {
let mut rows = self
.conn
.query(
"SELECT provider, last_test_ok, last_test_detail, last_tested_at FROM provider_status",
(),
)
.await
.context("failed to query provider_status")?;
let mut statuses = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read provider_status row")?
{
let provider: String = row.get(0).context("failed to decode provider name")?;
let last_test_ok: Option<i64> = row.get(1).ok();
let last_test_detail: Option<String> = row.get(2).ok();
let last_tested_at: String =
row.get(3).context("failed to decode provider timestamp")?;
statuses.push(ProviderStatusRow {
provider,
last_test_ok: last_test_ok.map(|value| value != 0),
last_test_detail,
last_tested_at: Some(last_tested_at),
});
}
Ok(statuses)
}
pub async fn load_provider_selection(&self) -> Result<ProviderSelectionResponse> {
let mut rows = self
.conn
.query(
"SELECT provider, model_id FROM provider_selection WHERE singleton = 1",
(),
)
.await
.context("failed to query provider_selection")?;
let Some(row) = rows
.next()
.await
.context("failed to read provider_selection row")?
else {
return Ok(ProviderSelectionResponse {
selected_provider: None,
selected_model_id: None,
});
};
let selected_provider: Option<String> = row.get(0).ok();
let selected_model_id: Option<String> = row.get(1).ok();
Ok(ProviderSelectionResponse {
selected_provider,
selected_model_id,
})
}
pub async fn save_provider_selection(
&self,
selection: &ProviderSelectionResponse,
) -> Result<ProviderSelectionResponse> {
self.conn
.execute(
r#"
INSERT INTO provider_selection (singleton, provider, model_id, updated_at)
VALUES (1, ?1, ?2, CURRENT_TIMESTAMP)
ON CONFLICT(singleton) DO UPDATE SET
provider = excluded.provider,
model_id = excluded.model_id,
updated_at = CURRENT_TIMESTAMP
"#,
(
selection.selected_provider.clone(),
selection.selected_model_id.clone(),
),
)
.await
.context("failed to persist provider selection")?;
self.load_provider_selection().await
}
pub async fn persist_session(&self, session_id: &str) -> Result<()> {
self.conn
.execute(
"INSERT OR IGNORE INTO sessions (id) VALUES (?1)",
[session_id],
)
.await
.with_context(|| format!("failed to persist session {session_id}"))?;
Ok(())
}
pub async fn session_exists(&self, session_id: &str) -> Result<bool> {
let mut stmt = self
.conn
.prepare("SELECT id FROM sessions WHERE id = ?1 LIMIT 1")
.await
.context("failed to prepare session lookup")?;
let mut rows = stmt
.query([session_id])
.await
.with_context(|| format!("failed to lookup session {session_id}"))?;
Ok(rows
.next()
.await
.context("failed to read session row")?
.is_some())
}
pub async fn upsert_workspace_task_run(
&self,
session_id: &str,
summary: &WorkspaceTaskRunSummary,
) -> Result<()> {
self.conn
.execute(
r#"
INSERT INTO workspace_task_runs (
run_id, session_id, task_id, path, status, command_label, exit_code,
stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
error_code, max_output_bytes, updated_at
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, CURRENT_TIMESTAMP)
ON CONFLICT(run_id) DO UPDATE SET
task_id = excluded.task_id,
path = excluded.path,
status = excluded.status,
command_label = excluded.command_label,
exit_code = excluded.exit_code,
stdout_tail = excluded.stdout_tail,
stderr_tail = excluded.stderr_tail,
stdout_truncated = excluded.stdout_truncated,
stderr_truncated = excluded.stderr_truncated,
timed_out = excluded.timed_out,
cancel_requested = excluded.cancel_requested,
started_at_ms = excluded.started_at_ms,
completed_at_ms = excluded.completed_at_ms,
duration_ms = excluded.duration_ms,
error = excluded.error,
error_code = excluded.error_code,
max_output_bytes = excluded.max_output_bytes,
updated_at = CURRENT_TIMESTAMP
"#,
turso::params![
summary.run_id.to_string(),
session_id.to_string(),
workspace_task_id_label(summary.task_id).to_string(),
summary.path.clone(),
workspace_task_run_status_label(summary.status).to_string(),
summary.command_label.clone(),
summary.exit_code.map(i64::from),
summary.stdout_tail.clone(),
summary.stderr_tail.clone(),
bool_to_i64(summary.stdout_truncated),
bool_to_i64(summary.stderr_truncated),
bool_to_i64(summary.timed_out),
bool_to_i64(summary.cancel_requested),
u64_to_i64(summary.started_at_ms, "workspace task started_at_ms")?,
summary
.completed_at_ms
.map(|value| u64_to_i64(value, "workspace task completed_at_ms"))
.transpose()?,
summary
.duration_ms
.map(|value| u64_to_i64(value, "workspace task duration_ms"))
.transpose()?,
summary.error.clone(),
summary.error_code.clone(),
usize_to_i64(summary.max_output_bytes, "workspace task max_output_bytes")?,
],
)
.await
.with_context(|| format!("failed to persist workspace task run {}", summary.run_id))?;
self.prune_workspace_task_run_history(session_id, WORKSPACE_HISTORY_RETAIN_PER_SESSION)
.await?;
Ok(())
}
pub async fn list_workspace_task_runs(
&self,
session_id: &str,
limit: usize,
error_code: Option<&str>,
) -> Result<Vec<WorkspaceTaskRunSummary>> {
let limit = usize_to_i64(limit.clamp(1, 200), "workspace task run list limit")?;
let error_code = error_code.map(str::to_string);
let mut rows = self
.conn
.query(
r#"
SELECT run_id, task_id, path, status, command_label, exit_code,
stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
error_code, max_output_bytes
FROM workspace_task_runs
WHERE session_id = ?1
AND (?2 IS NULL OR error_code = ?2)
ORDER BY started_at_ms DESC, run_id DESC
LIMIT ?3
"#,
(session_id.to_string(), error_code, limit),
)
.await
.context("failed to query workspace task runs")?;
let mut summaries = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read workspace task run row")?
{
summaries.push(decode_workspace_task_run_summary(&row)?);
}
Ok(summaries)
}
pub async fn get_workspace_task_run(
&self,
session_id: &str,
run_id: Uuid,
) -> Result<Option<WorkspaceTaskRunSummary>> {
let mut stmt = self
.conn
.prepare(
r#"
SELECT run_id, task_id, path, status, command_label, exit_code,
stdout_tail, stderr_tail, stdout_truncated, stderr_truncated, timed_out,
cancel_requested, started_at_ms, completed_at_ms, duration_ms, error,
error_code, max_output_bytes
FROM workspace_task_runs
WHERE session_id = ?1 AND run_id = ?2
LIMIT 1
"#,
)
.await
.context("failed to prepare workspace task run lookup")?;
let mut rows = stmt
.query((session_id.to_string(), run_id.to_string()))
.await
.with_context(|| format!("failed to lookup workspace task run {run_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read workspace task run lookup row")?
else {
return Ok(None);
};
Ok(Some(decode_workspace_task_run_summary(&row)?))
}
pub async fn start_workspace_file_change_audit(
&self,
session_id: &str,
request: &WorkspaceFileChangePreviewRequest,
size_bytes_before: Option<u64>,
) -> Result<WorkspaceFileChangeAuditEntry> {
let audit_id = Uuid::new_v4();
let created_at_ms = unix_epoch_ms();
self.conn
.execute(
r#"
INSERT INTO workspace_file_change_audits (
audit_id, session_id, action, path, target_path, status, error,
error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms, updated_at
)
VALUES (?1, ?2, ?3, ?4, ?5, 'applying', NULL, NULL, ?6, NULL, ?7, NULL, CURRENT_TIMESTAMP)
"#,
turso::params![
audit_id.to_string(),
session_id.to_string(),
workspace_file_change_action_label(request.action).to_string(),
request.path.clone(),
request.target_path.clone(),
size_bytes_before
.map(|value| u64_to_i64(value, "workspace file change size_bytes_before"))
.transpose()?,
u64_to_i64(created_at_ms, "workspace file change created_at_ms")?,
],
)
.await
.with_context(|| {
format!(
"failed to start workspace file change audit for {}",
request.path
)
})?;
self.prune_workspace_file_change_audit_history(
session_id,
WORKSPACE_HISTORY_RETAIN_PER_SESSION,
)
.await?;
self.find_workspace_file_change_audit(session_id, audit_id)
.await?
.with_context(|| format!("created workspace file change audit missing for {audit_id}"))
}
pub async fn finish_workspace_file_change_audit(
&self,
session_id: &str,
audit_id: Uuid,
status: WorkspaceFileChangeAuditStatus,
error: Option<&str>,
error_code: Option<&str>,
size_bytes_after: Option<u64>,
) -> Result<WorkspaceFileChangeAuditEntry> {
if matches!(status, WorkspaceFileChangeAuditStatus::Applying) {
anyhow::bail!("workspace file change audit finish status must be terminal");
}
let applied_at_ms = unix_epoch_ms();
self.conn
.execute(
r#"
UPDATE workspace_file_change_audits
SET status = ?3,
error = ?4,
error_code = ?5,
size_bytes_after = ?6,
applied_at_ms = ?7,
updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?1 AND audit_id = ?2
"#,
turso::params![
session_id.to_string(),
audit_id.to_string(),
workspace_file_change_audit_status_label(status).to_string(),
error.map(str::to_string),
error_code.map(str::to_string),
size_bytes_after
.map(|value| u64_to_i64(value, "workspace file change size_bytes_after"))
.transpose()?,
u64_to_i64(applied_at_ms, "workspace file change applied_at_ms")?,
],
)
.await
.with_context(|| format!("failed to finish workspace file change audit {audit_id}"))?;
self.find_workspace_file_change_audit(session_id, audit_id)
.await?
.with_context(|| format!("finished workspace file change audit missing for {audit_id}"))
}
pub async fn list_workspace_file_change_audits(
&self,
session_id: &str,
limit: usize,
error_code: Option<&str>,
) -> Result<Vec<WorkspaceFileChangeAuditEntry>> {
let limit = usize_to_i64(
limit.clamp(1, 200),
"workspace file change audit list limit",
)?;
let error_code = error_code.map(str::to_string);
let mut rows = self
.conn
.query(
r#"
SELECT audit_id, action, path, target_path, status, error,
error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
FROM workspace_file_change_audits
WHERE session_id = ?1
AND (?2 IS NULL OR error_code = ?2)
ORDER BY created_at_ms DESC, audit_id DESC
LIMIT ?3
"#,
(session_id.to_string(), error_code, limit),
)
.await
.context("failed to query workspace file change audits")?;
let mut entries = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read workspace file change audit row")?
{
entries.push(decode_workspace_file_change_audit_entry(&row)?);
}
Ok(entries)
}
async fn find_workspace_file_change_audit(
&self,
session_id: &str,
audit_id: Uuid,
) -> Result<Option<WorkspaceFileChangeAuditEntry>> {
let mut stmt = self
.conn
.prepare(
r#"
SELECT audit_id, action, path, target_path, status, error,
error_code, size_bytes_before, size_bytes_after, created_at_ms, applied_at_ms
FROM workspace_file_change_audits
WHERE session_id = ?1 AND audit_id = ?2
LIMIT 1
"#,
)
.await
.context("failed to prepare workspace file change audit lookup")?;
let mut rows = stmt
.query((session_id.to_string(), audit_id.to_string()))
.await
.with_context(|| format!("failed to lookup workspace file change audit {audit_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read workspace file change audit lookup row")?
else {
return Ok(None);
};
Ok(Some(decode_workspace_file_change_audit_entry(&row)?))
}
async fn prune_workspace_task_run_history(
&self,
session_id: &str,
retain_limit: usize,
) -> Result<()> {
let retain_limit = usize_to_i64(retain_limit.max(1), "workspace task retention limit")?;
self.conn
.execute(
r#"
DELETE FROM workspace_task_runs
WHERE session_id = ?1
AND run_id NOT IN (
SELECT run_id
FROM workspace_task_runs
WHERE session_id = ?1
ORDER BY started_at_ms DESC, run_id DESC
LIMIT ?2
)
"#,
(session_id.to_string(), retain_limit),
)
.await
.with_context(|| format!("failed to prune workspace task history for {session_id}"))?;
Ok(())
}
async fn prune_workspace_file_change_audit_history(
&self,
session_id: &str,
retain_limit: usize,
) -> Result<()> {
let retain_limit =
usize_to_i64(retain_limit.max(1), "workspace file audit retention limit")?;
self.conn
.execute(
r#"
DELETE FROM workspace_file_change_audits
WHERE session_id = ?1
AND audit_id NOT IN (
SELECT audit_id
FROM workspace_file_change_audits
WHERE session_id = ?1
ORDER BY created_at_ms DESC, audit_id DESC
LIMIT ?2
)
"#,
(session_id.to_string(), retain_limit),
)
.await
.with_context(|| {
format!("failed to prune workspace file change audit history for {session_id}")
})?;
Ok(())
}
pub async fn create_conversation(
&self,
session_id: &str,
title: Option<&str>,
) -> Result<ConversationSummary> {
let conversation_id = Uuid::new_v4().to_string();
self.insert_conversation(session_id, &conversation_id, title)
.await?;
self.find_conversation(session_id, &conversation_id)
.await?
.with_context(|| format!("persisted conversation missing for {conversation_id}"))
}
pub async fn ensure_conversation(
&self,
session_id: &str,
conversation_id: &str,
title_hint: Option<&str>,
) -> Result<ConversationSummary> {
self.insert_conversation(session_id, conversation_id, title_hint)
.await?;
self.find_conversation(session_id, conversation_id)
.await?
.with_context(|| format!("persisted conversation missing for {conversation_id}"))
}
pub async fn list_conversations(&self, session_id: &str) -> Result<Vec<ConversationSummary>> {
let mut rows = self
.conn
.query(
r#"
SELECT id, title, created_at, updated_at, last_message_at
FROM conversations
WHERE session_id = ?1
ORDER BY COALESCE(last_message_at, updated_at, created_at) DESC, id DESC
"#,
[session_id],
)
.await
.context("failed to query conversations")?;
let mut conversations = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read conversation row")?
{
conversations.push(decode_conversation_row(&row)?);
}
Ok(conversations)
}
pub async fn find_conversation(
&self,
session_id: &str,
conversation_id: &str,
) -> Result<Option<ConversationSummary>> {
let mut stmt = self
.conn
.prepare(
r#"
SELECT id, title, created_at, updated_at, last_message_at
FROM conversations
WHERE session_id = ?1 AND id = ?2
LIMIT 1
"#,
)
.await
.context("failed to prepare conversation lookup")?;
let mut rows = stmt
.query((session_id.to_string(), conversation_id.to_string()))
.await
.with_context(|| format!("failed to lookup conversation {conversation_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read conversation lookup row")?
else {
return Ok(None);
};
Ok(Some(decode_conversation_row(&row)?))
}
pub async fn delete_conversation(
&self,
session_id: &str,
conversation_id: &str,
) -> Result<bool> {
if self
.find_conversation(session_id, conversation_id)
.await?
.is_none()
{
return Ok(false);
}
self.conn
.execute(
"DELETE FROM messages WHERE conversation_id = ?1",
[conversation_id],
)
.await
.with_context(|| {
format!("failed to delete messages for conversation {conversation_id}")
})?;
let deleted = self
.conn
.execute("DELETE FROM conversations WHERE id = ?1", [conversation_id])
.await
.with_context(|| format!("failed to delete conversation {conversation_id}"))?;
Ok(deleted > 0)
}
pub async fn list_conversation_messages(
&self,
session_id: &str,
conversation_id: &str,
) -> Result<Vec<ConversationMessage>> {
if self
.find_conversation(session_id, conversation_id)
.await?
.is_none()
{
return Ok(Vec::new());
}
let mut stmt = self
.conn
.prepare(
r#"
SELECT id, conversation_id, role, content, status, created_at, provider, model_id
FROM messages
WHERE conversation_id = ?1
ORDER BY sequence ASC, id ASC
"#,
)
.await
.context("failed to prepare message list query")?;
let mut rows = stmt.query([conversation_id]).await.with_context(|| {
format!("failed to list messages for conversation {conversation_id}")
})?;
let mut messages = Vec::new();
while let Some(row) = rows.next().await.context("failed to read message row")? {
messages.push(decode_message_row(&row)?);
}
Ok(messages)
}
pub async fn create_message(
&self,
message: &NewConversationMessage,
) -> Result<ConversationMessage> {
let message_id = Uuid::new_v4().to_string();
let sequence = self.next_message_sequence(&message.conversation_id).await?;
self.conn
.execute(
r#"
INSERT INTO messages (id, conversation_id, sequence, role, content, status, provider, model_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
"#,
(
message_id.clone(),
message.conversation_id.clone(),
sequence,
message.role.clone(),
message.content.clone(),
message.status.clone(),
message.provider.clone(),
message.model_id.clone(),
),
)
.await
.with_context(|| format!("failed to persist message for conversation {}", message.conversation_id))?;
self.touch_conversation_activity(
&message.conversation_id,
message.role.eq("user").then_some(message.content.as_str()),
)
.await?;
self.find_message(&message_id)
.await?
.with_context(|| format!("persisted message missing for {message_id}"))
}
pub async fn update_message_content(
&self,
message_id: &str,
content: &str,
status: &str,
) -> Result<ConversationMessage> {
let conversation_id = self
.message_conversation_id(message_id)
.await?
.with_context(|| format!("message not found for update: {message_id}"))?;
self.conn
.execute(
"UPDATE messages SET content = ?2, status = ?3 WHERE id = ?1",
(
message_id.to_string(),
content.to_string(),
status.to_string(),
),
)
.await
.with_context(|| format!("failed to update message {message_id}"))?;
self.touch_conversation_activity(&conversation_id, None)
.await?;
self.find_message(message_id)
.await?
.with_context(|| format!("updated message missing for {message_id}"))
}
pub async fn append_message_delta(
&self,
message_id: &str,
delta: &str,
) -> Result<ConversationMessage> {
let conversation_id = self
.message_conversation_id(message_id)
.await?
.with_context(|| format!("message not found for delta append: {message_id}"))?;
self.conn
.execute(
"UPDATE messages SET content = content || ?2, status = 'streaming' WHERE id = ?1",
(message_id.to_string(), delta.to_string()),
)
.await
.with_context(|| format!("failed to append delta to message {message_id}"))?;
self.touch_conversation_activity(&conversation_id, None)
.await?;
self.find_message(message_id)
.await?
.with_context(|| format!("delta-updated message missing for {message_id}"))
}
pub async fn replace_source_files(
&self,
source_root_id: &str,
files: &[crate::ingest::ScannedSourceFile],
) -> Result<()> {
self.conn
.execute(
"DELETE FROM source_files WHERE source_root_id = ?1",
[source_root_id],
)
.await
.with_context(|| format!("failed to clear source files for {source_root_id}"))?;
for file in files {
self.conn
.execute(
r#"
INSERT INTO source_files (
source_root_id, relative_path, absolute_path, fingerprint,
size_bytes, modified_at, file_type, status, last_error, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, CURRENT_TIMESTAMP)
"#,
(
file.source_root_id.clone(),
file.relative_path.clone(),
file.absolute_path.clone(),
file.fingerprint.clone(),
file.size_bytes,
file.modified_at.clone(),
file.file_type.clone(),
file.status.clone(),
file.last_error.clone(),
),
)
.await
.with_context(|| format!("failed to persist source file {}", file.relative_path))?;
}
Ok(())
}
pub async fn create_ingest_job(
&self,
source_root_id: Option<&str>,
scope_label: &str,
) -> Result<IngestJobSummary> {
let job_id = Uuid::new_v4().to_string();
self.conn
.execute(
r#"
INSERT INTO ingest_jobs (id, source_root_id, scope_label, status, file_count, started_at)
VALUES (?1, ?2, ?3, 'running', 0, CURRENT_TIMESTAMP)
"#,
(
job_id.clone(),
source_root_id.map(str::to_string),
scope_label.to_string(),
),
)
.await
.with_context(|| format!("failed to create ingest job for {scope_label}"))?;
self.find_ingest_job(&job_id)
.await?
.with_context(|| format!("created ingest job missing for {job_id}"))
}
pub async fn complete_ingest_job(
&self,
job_id: &str,
status: &str,
file_count: usize,
last_error: Option<&str>,
) -> Result<IngestJobSummary> {
self.conn
.execute(
r#"
UPDATE ingest_jobs
SET status = ?2, file_count = ?3, completed_at = CURRENT_TIMESTAMP, last_error = ?4
WHERE id = ?1
"#,
(
job_id.to_string(),
status.to_string(),
file_count as i64,
last_error.map(str::to_string),
),
)
.await
.with_context(|| format!("failed to complete ingest job {job_id}"))?;
self.find_ingest_job(job_id)
.await?
.with_context(|| format!("completed ingest job missing for {job_id}"))
}
pub async fn list_ingest_jobs(&self) -> Result<Vec<IngestJobSummary>> {
let mut rows = self
.conn
.query(
"SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs ORDER BY started_at DESC LIMIT 20",
(),
)
.await
.context("failed to query ingest jobs")?;
let mut jobs = Vec::new();
while let Some(row) = rows.next().await.context("failed to read ingest job row")? {
jobs.push(decode_ingest_job_row(&row)?);
}
Ok(jobs)
}
pub async fn load_ingest_status(&self) -> Result<IngestStatusResponse> {
let jobs = self.list_ingest_jobs().await?;
let running = jobs.iter().any(|job| job.status == "running");
let total_source_files = self.count_source_files().await?;
let indexed_text_files = self.count_indexed_text_files().await?;
Ok(IngestStatusResponse {
running,
total_source_files,
indexed_text_files,
jobs,
})
}
async fn find_ingest_job(&self, job_id: &str) -> Result<Option<IngestJobSummary>> {
let mut stmt = self
.conn
.prepare(
"SELECT id, source_root_id, scope_label, status, file_count, started_at, completed_at, last_error FROM ingest_jobs WHERE id = ?1 LIMIT 1",
)
.await
.context("failed to prepare ingest job lookup")?;
let mut rows = stmt
.query([job_id])
.await
.with_context(|| format!("failed to lookup ingest job {job_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read ingest job lookup row")?
else {
return Ok(None);
};
Ok(Some(decode_ingest_job_row(&row)?))
}
async fn count_source_files(&self) -> Result<usize> {
let mut rows = self
.conn
.query("SELECT COUNT(*) FROM source_files", ())
.await
.context("failed to count source files")?;
let Some(row) = rows
.next()
.await
.context("failed to read source file count")?
else {
return Ok(0);
};
let count: i64 = row.get(0).context("failed to decode source file count")?;
Ok(count as usize)
}
async fn count_indexed_text_files(&self) -> Result<usize> {
let mut rows = self
.conn
.query(
"SELECT COUNT(*) FROM source_files WHERE status = 'indexed'",
(),
)
.await
.context("failed to count indexed text files")?;
let Some(row) = rows
.next()
.await
.context("failed to read indexed text file count")?
else {
return Ok(0);
};
let count: i64 = row
.get(0)
.context("failed to decode indexed text file count")?;
Ok(count as usize)
}
pub async fn list_indexed_source_files(&self) -> Result<Vec<IndexedSourceFileRow>> {
let mut rows = self
.conn
.query(
"SELECT source_root_id, relative_path, absolute_path FROM source_files WHERE status = 'indexed' ORDER BY source_root_id, relative_path",
(),
)
.await
.context("failed to query indexed source files")?;
let mut files = Vec::new();
while let Some(row) = rows
.next()
.await
.context("failed to read indexed source file row")?
{
files.push(IndexedSourceFileRow {
source_root_id: row
.get(0)
.context("failed to decode indexed source root id")?,
relative_path: row
.get(1)
.context("failed to decode indexed source relative path")?,
absolute_path: row
.get(2)
.context("failed to decode indexed source absolute path")?,
});
}
Ok(files)
}
async fn insert_conversation(
&self,
session_id: &str,
conversation_id: &str,
title: Option<&str>,
) -> Result<()> {
let normalized_title = normalize_conversation_title(title);
self.conn
.execute(
r#"
INSERT OR IGNORE INTO conversations (id, session_id, title, created_at, updated_at, last_message_at)
VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL)
"#,
(
conversation_id.to_string(),
session_id.to_string(),
normalized_title.clone(),
),
)
.await
.with_context(|| format!("failed to create conversation {conversation_id}"))?;
if self
.find_conversation(session_id, conversation_id)
.await?
.is_none()
{
anyhow::bail!("conversation belongs to a different session");
}
self.conn
.execute(
r#"
UPDATE conversations
SET title = CASE
WHEN title = ?2 AND ?3 IS NOT NULL THEN ?3
ELSE title
END
WHERE id = ?1
"#,
(
conversation_id.to_string(),
DEFAULT_CONVERSATION_TITLE.to_string(),
title.map(conversation_title_from_message),
),
)
.await
.with_context(|| {
format!("failed to normalize title for conversation {conversation_id}")
})?;
Ok(())
}
async fn next_message_sequence(&self, conversation_id: &str) -> Result<i64> {
let mut stmt = self
.conn
.prepare(
"SELECT COALESCE(MAX(sequence), 0) + 1 FROM messages WHERE conversation_id = ?1",
)
.await
.context("failed to prepare message sequence lookup")?;
let mut rows = stmt.query([conversation_id]).await.with_context(|| {
format!("failed to query next message sequence for {conversation_id}")
})?;
let Some(row) = rows
.next()
.await
.context("failed to read message sequence row")?
else {
return Ok(1);
};
let sequence: i64 = row.get(0).context("failed to decode message sequence")?;
Ok(sequence)
}
async fn touch_conversation_activity(
&self,
conversation_id: &str,
title_hint: Option<&str>,
) -> Result<()> {
self.conn
.execute(
r#"
UPDATE conversations
SET title = CASE
WHEN ?2 IS NOT NULL AND title = ?3 THEN ?2
ELSE title
END,
updated_at = CURRENT_TIMESTAMP,
last_message_at = CURRENT_TIMESTAMP
WHERE id = ?1
"#,
(
conversation_id.to_string(),
title_hint.map(conversation_title_from_message),
DEFAULT_CONVERSATION_TITLE.to_string(),
),
)
.await
.with_context(|| format!("failed to touch conversation {conversation_id}"))?;
Ok(())
}
async fn find_message(&self, message_id: &str) -> Result<Option<ConversationMessage>> {
let mut stmt = self
.conn
.prepare(
r#"
SELECT id, conversation_id, role, content, status, created_at, provider, model_id
FROM messages
WHERE id = ?1
LIMIT 1
"#,
)
.await
.context("failed to prepare message lookup")?;
let mut rows = stmt
.query([message_id])
.await
.with_context(|| format!("failed to lookup message {message_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read message lookup row")?
else {
return Ok(None);
};
Ok(Some(decode_message_row(&row)?))
}
async fn message_conversation_id(&self, message_id: &str) -> Result<Option<String>> {
let mut stmt = self
.conn
.prepare("SELECT conversation_id FROM messages WHERE id = ?1 LIMIT 1")
.await
.context("failed to prepare message conversation lookup")?;
let mut rows = stmt
.query([message_id])
.await
.with_context(|| format!("failed to lookup message conversation for {message_id}"))?;
let Some(row) = rows
.next()
.await
.context("failed to read message conversation row")?
else {
return Ok(None);
};
let conversation_id: String = row
.get(0)
.context("failed to decode message conversation id")?;
Ok(Some(conversation_id))
}
}
#[derive(Debug, Clone)]
pub struct ProviderStatusRow {
pub provider: String,
pub last_test_ok: Option<bool>,
pub last_test_detail: Option<String>,
pub last_tested_at: Option<String>,
}
fn decode_ingest_job_row(row: &Row) -> Result<IngestJobSummary> {
Ok(IngestJobSummary {
id: row.get(0).context("failed to decode ingest job id")?,
source_root_id: row.get(1).ok(),
scope_label: row.get(2).context("failed to decode ingest scope label")?,
status: row.get(3).context("failed to decode ingest status")?,
file_count: row
.get::<i64>(4)
.context("failed to decode ingest file count")? as usize,
started_at: row.get(5).context("failed to decode ingest started_at")?,
completed_at: row.get(6).ok(),
last_error: row.get(7).ok(),
})
}
fn decode_conversation_row(row: &Row) -> Result<ConversationSummary> {
Ok(ConversationSummary {
id: row.get(0).context("failed to decode conversation id")?,
title: row.get(1).context("failed to decode conversation title")?,
created_at: row
.get(2)
.context("failed to decode conversation created_at")?,
updated_at: row
.get(3)
.context("failed to decode conversation updated_at")?,
last_message_at: row.get(4).ok(),
})
}
fn decode_message_row(row: &Row) -> Result<ConversationMessage> {
Ok(ConversationMessage {
id: row.get(0).context("failed to decode message id")?,
conversation_id: row
.get(1)
.context("failed to decode message conversation_id")?,
role: row.get(2).context("failed to decode message role")?,
content: row.get(3).context("failed to decode message content")?,
status: row.get(4).context("failed to decode message status")?,
created_at: row.get(5).context("failed to decode message created_at")?,
provider: row.get(6).ok(),
model_id: row.get(7).ok(),
})
}
fn decode_workspace_task_run_summary(row: &Row) -> Result<WorkspaceTaskRunSummary> {
let run_id: String = row
.get(0)
.context("failed to decode workspace task run id")?;
let task_id: String = row
.get(1)
.context("failed to decode workspace task run task_id")?;
let status: String = row
.get(3)
.context("failed to decode workspace task run status")?;
let exit_code: Option<i64> = row.get(5).ok();
let stdout_truncated: i64 = row
.get(8)
.context("failed to decode workspace task stdout_truncated")?;
let stderr_truncated: i64 = row
.get(9)
.context("failed to decode workspace task stderr_truncated")?;
let timed_out: i64 = row
.get(10)
.context("failed to decode workspace task timed_out")?;
let cancel_requested: i64 = row
.get(11)
.context("failed to decode workspace task cancel_requested")?;
let started_at_ms: i64 = row
.get(12)
.context("failed to decode workspace task started_at_ms")?;
let completed_at_ms: Option<i64> = row.get(13).ok();
let duration_ms: Option<i64> = row.get(14).ok();
let max_output_bytes: i64 = row
.get(17)
.context("failed to decode workspace task max_output_bytes")?;
Ok(WorkspaceTaskRunSummary {
run_id: Uuid::parse_str(&run_id).with_context(|| format!("invalid run UUID {run_id}"))?,
task_id: parse_workspace_task_id(&task_id)?,
path: row.get(2).context("failed to decode workspace task path")?,
status: parse_workspace_task_run_status(&status)?,
command_label: row
.get(4)
.context("failed to decode workspace task command_label")?,
exit_code: exit_code
.map(|value| {
i32::try_from(value)
.with_context(|| format!("workspace task exit_code out of range: {value}"))
})
.transpose()?,
stdout_tail: row
.get(6)
.context("failed to decode workspace task stdout_tail")?,
stderr_tail: row
.get(7)
.context("failed to decode workspace task stderr_tail")?,
stdout_truncated: stdout_truncated != 0,
stderr_truncated: stderr_truncated != 0,
timed_out: timed_out != 0,
cancel_requested: cancel_requested != 0,
started_at_ms: i64_to_u64(started_at_ms, "workspace task started_at_ms")?,
completed_at_ms: completed_at_ms
.map(|value| i64_to_u64(value, "workspace task completed_at_ms"))
.transpose()?,
duration_ms: duration_ms
.map(|value| i64_to_u64(value, "workspace task duration_ms"))
.transpose()?,
error: row.get(15).ok(),
error_code: row.get(16).ok(),
max_output_bytes: usize::try_from(max_output_bytes).with_context(|| {
format!("workspace task max_output_bytes out of range: {max_output_bytes}")
})?,
})
}
fn decode_workspace_file_change_audit_entry(row: &Row) -> Result<WorkspaceFileChangeAuditEntry> {
let audit_id: String = row
.get(0)
.context("failed to decode workspace file change audit id")?;
let action: String = row
.get(1)
.context("failed to decode workspace file change action")?;
let status: String = row
.get(4)
.context("failed to decode workspace file change status")?;
let size_bytes_before: Option<i64> = row.get(7).ok();
let size_bytes_after: Option<i64> = row.get(8).ok();
let created_at_ms: i64 = row
.get(9)
.context("failed to decode workspace file change created_at_ms")?;
let applied_at_ms: Option<i64> = row.get(10).ok();
Ok(WorkspaceFileChangeAuditEntry {
audit_id: Uuid::parse_str(&audit_id)
.with_context(|| format!("invalid workspace file change audit UUID {audit_id}"))?,
action: parse_workspace_file_change_action(&action)?,
path: row
.get(2)
.context("failed to decode workspace file change path")?,
target_path: row.get(3).ok(),
status: parse_workspace_file_change_audit_status(&status)?,
error: row.get(5).ok(),
error_code: row.get(6).ok(),
size_bytes_before: size_bytes_before
.map(|value| i64_to_u64(value, "workspace file change size_bytes_before"))
.transpose()?,
size_bytes_after: size_bytes_after
.map(|value| i64_to_u64(value, "workspace file change size_bytes_after"))
.transpose()?,
created_at_ms: i64_to_u64(created_at_ms, "workspace file change created_at_ms")?,
applied_at_ms: applied_at_ms
.map(|value| i64_to_u64(value, "workspace file change applied_at_ms"))
.transpose()?,
})
}
fn normalize_conversation_title(title: Option<&str>) -> String {
title
.map(str::trim)
.filter(|value| !value.is_empty())
.map(conversation_title_from_message)
.unwrap_or_else(|| DEFAULT_CONVERSATION_TITLE.to_string())
}
fn conversation_title_from_message(message: &str) -> String {
let trimmed = message.trim();
if trimmed.is_empty() {
return DEFAULT_CONVERSATION_TITLE.to_string();
}
let mut title = String::new();
for ch in trimmed.chars().take(CONVERSATION_TITLE_LIMIT) {
title.push(ch);
}
if trimmed.chars().count() > CONVERSATION_TITLE_LIMIT {
title.push_str("...");
}
title
}
fn workspace_task_id_label(task_id: WorkspaceTaskId) -> &'static str {
match task_id {
WorkspaceTaskId::GitStatus => "git_status",
WorkspaceTaskId::GitDiff => "git_diff",
WorkspaceTaskId::CargoCheck => "cargo_check",
WorkspaceTaskId::NpmCheck => "npm_check",
}
}
fn parse_workspace_task_id(value: &str) -> Result<WorkspaceTaskId> {
match value {
"git_status" => Ok(WorkspaceTaskId::GitStatus),
"git_diff" => Ok(WorkspaceTaskId::GitDiff),
"cargo_check" => Ok(WorkspaceTaskId::CargoCheck),
"npm_check" => Ok(WorkspaceTaskId::NpmCheck),
_ => Err(anyhow::anyhow!("unsupported workspace task id: {value}")),
}
}
fn workspace_task_run_status_label(status: WorkspaceTaskRunStatus) -> &'static str {
match status {
WorkspaceTaskRunStatus::Queued => "queued",
WorkspaceTaskRunStatus::Running => "running",
WorkspaceTaskRunStatus::Complete => "complete",
WorkspaceTaskRunStatus::Failed => "failed",
WorkspaceTaskRunStatus::Cancelled => "cancelled",
WorkspaceTaskRunStatus::TimedOut => "timed_out",
}
}
fn parse_workspace_task_run_status(value: &str) -> Result<WorkspaceTaskRunStatus> {
match value {
"queued" => Ok(WorkspaceTaskRunStatus::Queued),
"running" => Ok(WorkspaceTaskRunStatus::Running),
"complete" => Ok(WorkspaceTaskRunStatus::Complete),
"failed" => Ok(WorkspaceTaskRunStatus::Failed),
"cancelled" => Ok(WorkspaceTaskRunStatus::Cancelled),
"timed_out" => Ok(WorkspaceTaskRunStatus::TimedOut),
_ => Err(anyhow::anyhow!(
"unsupported workspace task run status: {value}"
)),
}
}
fn workspace_file_change_action_label(action: WorkspaceFileChangeAction) -> &'static str {
match action {
WorkspaceFileChangeAction::WriteText => "write_text",
WorkspaceFileChangeAction::DeleteFile => "delete_file",
WorkspaceFileChangeAction::RenamePath => "rename_path",
}
}
fn parse_workspace_file_change_action(value: &str) -> Result<WorkspaceFileChangeAction> {
match value {
"write_text" => Ok(WorkspaceFileChangeAction::WriteText),
"delete_file" => Ok(WorkspaceFileChangeAction::DeleteFile),
"rename_path" => Ok(WorkspaceFileChangeAction::RenamePath),
_ => Err(anyhow::anyhow!(
"unsupported workspace file change action: {value}"
)),
}
}
fn workspace_file_change_audit_status_label(
status: WorkspaceFileChangeAuditStatus,
) -> &'static str {
match status {
WorkspaceFileChangeAuditStatus::Applying => "applying",
WorkspaceFileChangeAuditStatus::Complete => "complete",
WorkspaceFileChangeAuditStatus::Failed => "failed",
}
}
fn parse_workspace_file_change_audit_status(value: &str) -> Result<WorkspaceFileChangeAuditStatus> {
match value {
"applying" => Ok(WorkspaceFileChangeAuditStatus::Applying),
"complete" => Ok(WorkspaceFileChangeAuditStatus::Complete),
"failed" => Ok(WorkspaceFileChangeAuditStatus::Failed),
_ => Err(anyhow::anyhow!(
"unsupported workspace file change audit status: {value}"
)),
}
}
fn bool_to_i64(value: bool) -> i64 {
if value { 1 } else { 0 }
}
fn u64_to_i64(value: u64, label: &str) -> Result<i64> {
i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
}
fn usize_to_i64(value: usize, label: &str) -> Result<i64> {
i64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
}
fn i64_to_u64(value: i64, label: &str) -> Result<u64> {
u64::try_from(value).with_context(|| format!("{label} out of range: {value}"))
}
fn unix_epoch_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use soma_studio_core::{
AppConfig, ProviderSelectionResponse, SourceRootSummary, WorkspaceFileChangeAction,
WorkspaceFileChangeAuditStatus, WorkspaceFileChangePreviewRequest, WorkspaceTaskId,
WorkspaceTaskRunStatus, WorkspaceTaskRunSummary,
};
use turso::Builder;
use uuid::Uuid;
use super::{
DEFAULT_CONVERSATION_TITLE, NewConversationMessage, STORAGE_SCHEMA_VERSION, StudioStorage,
WORKSPACE_HISTORY_RETAIN_PER_SESSION,
};
#[tokio::test]
async fn source_root_roundtrip_persists() {
let temp_dir = std::env::temp_dir().join(format!("soma-studio-storage-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let source_root = SourceRootSummary {
id: Uuid::new_v4(),
path: "F:/docs".to_string(),
read_only: true,
};
let first = storage
.upsert_source_root(&source_root)
.await
.expect("insert");
let second = storage
.upsert_source_root(&source_root)
.await
.expect("duplicate");
let listed = storage.list_source_roots().await.expect("list");
assert_eq!(first.id, second.id);
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].path, "F:/docs");
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn provider_selection_roundtrip_persists() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-selection-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let saved = storage
.save_provider_selection(&ProviderSelectionResponse {
selected_provider: Some("ollama".to_string()),
selected_model_id: Some("qwen3:8b".to_string()),
})
.await
.expect("save selection");
let loaded = storage
.load_provider_selection()
.await
.expect("load selection");
assert_eq!(saved.selected_provider.as_deref(), Some("ollama"));
assert_eq!(loaded.selected_model_id.as_deref(), Some("qwen3:8b"));
let cleared = storage
.save_provider_selection(&ProviderSelectionResponse {
selected_provider: None,
selected_model_id: None,
})
.await
.expect("clear selection");
assert!(cleared.selected_provider.is_none());
assert!(cleared.selected_model_id.is_none());
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn session_roundtrip_persists() {
let temp_dir = std::env::temp_dir().join(format!("soma-studio-session-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let session_id = Uuid::new_v4().to_string();
assert!(
!storage
.session_exists(&session_id)
.await
.expect("session missing")
);
storage
.persist_session(&session_id)
.await
.expect("persist session");
assert!(
storage
.session_exists(&session_id)
.await
.expect("session exists")
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn workspace_task_run_history_roundtrip_persists() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-task-runs-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let session_id = Uuid::new_v4().to_string();
let run_id = Uuid::new_v4();
let mut summary = WorkspaceTaskRunSummary {
run_id,
task_id: WorkspaceTaskId::GitStatus,
path: ".".to_string(),
status: WorkspaceTaskRunStatus::Running,
command_label: "git status --short --branch".to_string(),
exit_code: None,
stdout_tail: String::new(),
stderr_tail: String::new(),
stdout_truncated: false,
stderr_truncated: false,
timed_out: false,
cancel_requested: false,
started_at_ms: 1_000,
completed_at_ms: None,
duration_ms: None,
error: None,
error_code: None,
max_output_bytes: 64 * 1024,
};
storage
.upsert_workspace_task_run(&session_id, &summary)
.await
.expect("persist running task");
summary.status = WorkspaceTaskRunStatus::Complete;
summary.exit_code = Some(0);
summary.stdout_tail = "## main\n".to_string();
summary.completed_at_ms = Some(1_050);
summary.duration_ms = Some(50);
summary.error_code = None;
storage
.upsert_workspace_task_run(&session_id, &summary)
.await
.expect("persist completed task");
let listed = storage
.list_workspace_task_runs(&session_id, 10, None)
.await
.expect("list task runs");
let loaded = storage
.get_workspace_task_run(&session_id, run_id)
.await
.expect("load task run")
.expect("task run exists");
assert_eq!(listed.len(), 1);
assert_eq!(loaded.run_id, run_id);
assert_eq!(loaded.status, WorkspaceTaskRunStatus::Complete);
assert_eq!(loaded.stdout_tail, "## main\n");
assert_eq!(loaded.duration_ms, Some(50));
assert!(
storage
.list_workspace_task_runs("other-session", 10, None)
.await
.expect("other session task runs")
.is_empty()
);
let interrupted_id = Uuid::new_v4();
let interrupted = WorkspaceTaskRunSummary {
run_id: interrupted_id,
task_id: WorkspaceTaskId::CargoCheck,
path: ".".to_string(),
status: WorkspaceTaskRunStatus::Running,
command_label: "cargo check --workspace".to_string(),
exit_code: None,
stdout_tail: String::new(),
stderr_tail: String::new(),
stdout_truncated: false,
stderr_truncated: false,
timed_out: false,
cancel_requested: false,
started_at_ms: 1_100,
completed_at_ms: None,
duration_ms: None,
error: None,
error_code: None,
max_output_bytes: 64 * 1024,
};
storage
.upsert_workspace_task_run(&session_id, &interrupted)
.await
.expect("persist interrupted candidate");
drop(storage);
let reopened = StudioStorage::open(&config)
.await
.expect("reopened storage");
let interrupted_loaded = reopened
.get_workspace_task_run(&session_id, interrupted_id)
.await
.expect("load interrupted")
.expect("interrupted task exists");
assert_eq!(interrupted_loaded.status, WorkspaceTaskRunStatus::Failed);
assert_eq!(
interrupted_loaded.error.as_deref(),
Some("workspace task interrupted before completion")
);
assert_eq!(
interrupted_loaded.error_code.as_deref(),
Some("workspace_task_interrupted")
);
assert!(interrupted_loaded.completed_at_ms.is_some());
let filtered_interrupted = reopened
.list_workspace_task_runs(&session_id, 10, Some("workspace_task_interrupted"))
.await
.expect("list interrupted task runs");
assert_eq!(filtered_interrupted.len(), 1);
assert_eq!(filtered_interrupted[0].run_id, interrupted_id);
assert!(
reopened
.list_workspace_task_runs(&session_id, 10, Some("workspace_task_failed_exit"))
.await
.expect("list failed exit task runs")
.is_empty()
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn workspace_file_change_audit_roundtrip_persists_without_content() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-file-audit-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let session_id = Uuid::new_v4().to_string();
let request = WorkspaceFileChangePreviewRequest {
action: WorkspaceFileChangeAction::WriteText,
path: "docs/new.md".to_string(),
target_path: None,
content: Some("do not persist this content".to_string()),
expected_modified_at_ms: None,
};
let started = storage
.start_workspace_file_change_audit(&session_id, &request, None)
.await
.expect("start file audit");
assert_eq!(started.status, WorkspaceFileChangeAuditStatus::Applying);
assert_eq!(started.path, "docs/new.md");
assert_eq!(started.size_bytes_before, None);
let completed = storage
.finish_workspace_file_change_audit(
&session_id,
started.audit_id,
WorkspaceFileChangeAuditStatus::Complete,
None,
None,
Some(12),
)
.await
.expect("finish file audit");
let listed = storage
.list_workspace_file_change_audits(&session_id, 10, None)
.await
.expect("list file audits");
assert_eq!(completed.status, WorkspaceFileChangeAuditStatus::Complete);
assert_eq!(completed.size_bytes_after, Some(12));
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].audit_id, started.audit_id);
assert_eq!(listed[0].error, None);
assert_eq!(listed[0].error_code, None);
assert!(
storage
.list_workspace_file_change_audits("other-session", 10, None)
.await
.expect("other session file audits")
.is_empty()
);
let interrupted_request = WorkspaceFileChangePreviewRequest {
action: WorkspaceFileChangeAction::RenamePath,
path: "docs/new.md".to_string(),
target_path: Some("docs/renamed.md".to_string()),
content: None,
expected_modified_at_ms: Some(1_000),
};
let interrupted = storage
.start_workspace_file_change_audit(&session_id, &interrupted_request, Some(12))
.await
.expect("start interrupted file audit");
drop(storage);
let reopened = StudioStorage::open(&config)
.await
.expect("reopened storage");
let reopened_audits = reopened
.list_workspace_file_change_audits(&session_id, 10, None)
.await
.expect("reopened file audits");
let interrupted_loaded = reopened_audits
.iter()
.find(|audit| audit.audit_id == interrupted.audit_id)
.expect("interrupted audit exists");
assert_eq!(
interrupted_loaded.status,
WorkspaceFileChangeAuditStatus::Failed
);
assert_eq!(
interrupted_loaded.error.as_deref(),
Some("workspace file change interrupted before completion")
);
assert_eq!(
interrupted_loaded.error_code.as_deref(),
Some("workspace_file_change_interrupted")
);
assert!(interrupted_loaded.applied_at_ms.is_some());
let filtered_interrupted = reopened
.list_workspace_file_change_audits(
&session_id,
10,
Some("workspace_file_change_interrupted"),
)
.await
.expect("list interrupted file audits");
assert_eq!(filtered_interrupted.len(), 1);
assert_eq!(filtered_interrupted[0].audit_id, interrupted.audit_id);
assert!(
reopened
.list_workspace_file_change_audits(&session_id, 10, Some("workspace_conflict"))
.await
.expect("list conflict file audits")
.is_empty()
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn workspace_task_run_schema_adds_error_code_to_existing_db() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-task-run-migration-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let db_path = config.db_path.to_str().expect("db path");
let db = Builder::new_local(db_path).build().await.expect("db");
let conn = db.connect().expect("connect");
conn.execute_batch(
r#"
CREATE TABLE workspace_task_runs (
run_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
task_id TEXT NOT NULL,
path TEXT NOT NULL,
status TEXT NOT NULL,
command_label TEXT NOT NULL,
exit_code INTEGER,
stdout_tail TEXT NOT NULL,
stderr_tail TEXT NOT NULL,
stdout_truncated INTEGER NOT NULL,
stderr_truncated INTEGER NOT NULL,
timed_out INTEGER NOT NULL,
cancel_requested INTEGER NOT NULL,
started_at_ms INTEGER NOT NULL,
completed_at_ms INTEGER,
duration_ms INTEGER,
error TEXT,
max_output_bytes INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"#,
)
.await
.expect("old workspace task schema");
drop(conn);
drop(db);
let storage = StudioStorage::open(&config)
.await
.expect("migrated storage");
let schema_status = storage.schema_status().await.expect("schema status");
assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
assert!(schema_status.is_current());
let session_id = Uuid::new_v4().to_string();
let run_id = Uuid::new_v4();
storage
.upsert_workspace_task_run(
&session_id,
&WorkspaceTaskRunSummary {
run_id,
task_id: WorkspaceTaskId::GitStatus,
path: ".".to_string(),
status: WorkspaceTaskRunStatus::Failed,
command_label: "git status --short --branch".to_string(),
exit_code: Some(1),
stdout_tail: String::new(),
stderr_tail: "failed".to_string(),
stdout_truncated: false,
stderr_truncated: false,
timed_out: false,
cancel_requested: false,
started_at_ms: 1_000,
completed_at_ms: Some(1_010),
duration_ms: Some(10),
error: Some("workspace task exited with code 1".to_string()),
error_code: Some("workspace_task_failed_exit".to_string()),
max_output_bytes: 64 * 1024,
},
)
.await
.expect("persist migrated task run");
let loaded = storage
.get_workspace_task_run(&session_id, run_id)
.await
.expect("load migrated task run")
.expect("migrated task run exists");
assert_eq!(
loaded.error_code.as_deref(),
Some("workspace_task_failed_exit")
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn workspace_file_change_audit_schema_adds_error_code_to_existing_db() {
let temp_dir = std::env::temp_dir().join(format!(
"soma-studio-file-audit-migration-{}",
Uuid::new_v4()
));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let db_path = config.db_path.to_str().expect("db path");
let db = Builder::new_local(db_path).build().await.expect("db");
let conn = db.connect().expect("connect");
conn.execute_batch(
r#"
CREATE TABLE workspace_file_change_audits (
audit_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
action TEXT NOT NULL,
path TEXT NOT NULL,
target_path TEXT,
status TEXT NOT NULL,
error TEXT,
size_bytes_before INTEGER,
size_bytes_after INTEGER,
created_at_ms INTEGER NOT NULL,
applied_at_ms INTEGER,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"#,
)
.await
.expect("old workspace file audit schema");
drop(conn);
drop(db);
let storage = StudioStorage::open(&config)
.await
.expect("migrated storage");
let schema_status = storage.schema_status().await.expect("schema status");
assert_eq!(schema_status.current_version, STORAGE_SCHEMA_VERSION);
assert!(schema_status.is_current());
let session_id = Uuid::new_v4().to_string();
let request = WorkspaceFileChangePreviewRequest {
action: WorkspaceFileChangeAction::RenamePath,
path: "docs/new.md".to_string(),
target_path: Some("docs/renamed.md".to_string()),
content: None,
expected_modified_at_ms: Some(1_000),
};
let started = storage
.start_workspace_file_change_audit(&session_id, &request, Some(10))
.await
.expect("start migrated file audit");
let failed = storage
.finish_workspace_file_change_audit(
&session_id,
started.audit_id,
WorkspaceFileChangeAuditStatus::Failed,
Some("workspace rename target already exists"),
Some("workspace_conflict"),
Some(10),
)
.await
.expect("finish migrated file audit");
assert_eq!(failed.error_code.as_deref(), Some("workspace_conflict"));
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn storage_rejects_newer_schema_version() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-newer-schema-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let db_path = config.db_path.to_str().expect("db path");
let db = Builder::new_local(db_path).build().await.expect("db");
let conn = db.connect().expect("connect");
conn.execute(
&format!("PRAGMA user_version = {}", STORAGE_SCHEMA_VERSION + 1),
(),
)
.await
.expect("set newer schema version");
drop(conn);
drop(db);
let error = StudioStorage::open(&config)
.await
.expect_err("newer schema must be rejected");
assert!(
error.to_string().contains("newer than supported version"),
"{error}"
);
let db = Builder::new_local(db_path)
.build()
.await
.expect("db reopen");
let conn = db.connect().expect("connect reopened db");
let mut rows = conn
.query(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'source_roots'",
(),
)
.await
.expect("query table count");
let row = rows
.next()
.await
.expect("read table count")
.expect("table count row");
let table_count: i64 = row.get(0).expect("decode table count");
assert_eq!(table_count, 0);
drop(rows);
drop(conn);
drop(db);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn workspace_history_retention_limits_rows_per_session() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-history-retention-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let session_id = Uuid::new_v4().to_string();
let request = WorkspaceFileChangePreviewRequest {
action: WorkspaceFileChangeAction::DeleteFile,
path: "docs/old.md".to_string(),
target_path: None,
content: None,
expected_modified_at_ms: Some(1_000),
};
for index in 0..205 {
storage
.upsert_workspace_task_run(
&session_id,
&WorkspaceTaskRunSummary {
run_id: Uuid::new_v4(),
task_id: WorkspaceTaskId::GitStatus,
path: ".".to_string(),
status: WorkspaceTaskRunStatus::Complete,
command_label: "git status --short --branch".to_string(),
exit_code: Some(0),
stdout_tail: String::new(),
stderr_tail: String::new(),
stdout_truncated: false,
stderr_truncated: false,
timed_out: false,
cancel_requested: false,
started_at_ms: index,
completed_at_ms: Some(index + 1),
duration_ms: Some(1),
error: None,
error_code: None,
max_output_bytes: 64 * 1024,
},
)
.await
.expect("persist retained task run");
storage
.start_workspace_file_change_audit(&session_id, &request, Some(index))
.await
.expect("persist retained file audit");
}
let task_run_count = count_rows(
&storage,
"SELECT COUNT(*) FROM workspace_task_runs WHERE session_id = ?1",
&session_id,
)
.await;
let file_audit_count = count_rows(
&storage,
"SELECT COUNT(*) FROM workspace_file_change_audits WHERE session_id = ?1",
&session_id,
)
.await;
assert_eq!(task_run_count, WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64);
assert_eq!(
file_audit_count,
WORKSPACE_HISTORY_RETAIN_PER_SESSION as i64
);
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn conversation_and_message_roundtrip_persists() {
let temp_dir =
std::env::temp_dir().join(format!("soma-studio-conversation-{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("temp dir");
let config = test_config(&temp_dir);
let storage = StudioStorage::open(&config).await.expect("storage");
let conversation = storage
.create_conversation("session-a", None)
.await
.expect("conversation");
assert_eq!(conversation.title, DEFAULT_CONVERSATION_TITLE);
let user_message = storage
.create_message(&NewConversationMessage {
conversation_id: conversation.id.clone(),
role: "user".to_string(),
content: "Discuss the ingest milestone split".to_string(),
status: "complete".to_string(),
provider: None,
model_id: None,
})
.await
.expect("user message");
let assistant_message = storage
.create_message(&NewConversationMessage {
conversation_id: conversation.id.clone(),
role: "assistant".to_string(),
content: String::new(),
status: "streaming".to_string(),
provider: Some("ollama".to_string()),
model_id: Some("qwen3:8b".to_string()),
})
.await
.expect("assistant placeholder");
let partial_assistant = storage
.append_message_delta(&assistant_message.id, "Start with conversation ")
.await
.expect("assistant delta");
let completed_assistant = storage
.update_message_content(
&assistant_message.id,
"Start with conversation persistence and restore APIs.",
"complete",
)
.await
.expect("assistant completion");
let conversations = storage
.list_conversations("session-a")
.await
.expect("conversations");
let messages = storage
.list_conversation_messages("session-a", &conversation.id)
.await
.expect("messages");
assert_eq!(conversations.len(), 1);
assert_eq!(conversations[0].id, conversation.id);
assert_eq!(conversations[0].title, "Discuss the ingest milestone split");
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].id, user_message.id);
assert_eq!(messages[1].id, assistant_message.id);
assert_eq!(partial_assistant.content, "Start with conversation ");
assert_eq!(partial_assistant.status, "streaming");
assert_eq!(completed_assistant.status, "complete");
assert_eq!(completed_assistant.provider.as_deref(), Some("ollama"));
assert!(
storage
.list_conversations("session-b")
.await
.expect("session-b conversations")
.is_empty()
);
assert!(
storage
.list_conversation_messages("session-b", &conversation.id)
.await
.expect("session-b messages")
.is_empty()
);
let deleted = storage
.delete_conversation("session-a", &conversation.id)
.await
.expect("delete conversation");
let messages_after_delete = storage
.list_conversation_messages("session-a", &conversation.id)
.await
.expect("messages after delete");
assert!(deleted);
assert!(messages_after_delete.is_empty());
assert!(
storage
.list_conversations("session-a")
.await
.expect("conversations after delete")
.is_empty()
);
let _ = std::fs::remove_dir_all(temp_dir);
}
fn test_config(temp_dir: &std::path::Path) -> AppConfig {
AppConfig {
app_name: "Soma Studio".to_string(),
bind_addr: "127.0.0.1:0".to_string(),
project_root: temp_dir.to_path_buf(),
data_dir: temp_dir.to_path_buf(),
derived_dir: temp_dir.join("derived"),
notebook_dir: temp_dir.join("notebook"),
user_assets_dir: temp_dir.join("assets"),
db_path: temp_dir.join("test.db"),
web_build_dir: PathBuf::from("unused"),
web_shell_file: PathBuf::from("unused/spa.html"),
}
}
async fn count_rows(storage: &StudioStorage, sql: &str, session_id: &str) -> i64 {
let mut rows = storage
.conn
.query(sql, [session_id])
.await
.expect("count query");
let row = rows
.next()
.await
.expect("count row read")
.expect("count row");
row.get(0).expect("count value")
}
}