1use crate::config::paths::Paths;
2use crate::conversation::message::Message;
3use crate::conversation::Conversation;
4use crate::model::ModelConfig;
5use crate::providers::base::{Provider, MSG_COUNT_FOR_SESSION_NAME_GENERATION};
6use crate::recipe::Recipe;
7use crate::session::extension_data::ExtensionData;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use rmcp::model::Role;
11use serde::{Deserialize, Serialize};
12use sqlx::sqlite::SqliteConnectOptions;
13use sqlx::{Pool, Sqlite};
14use std::collections::HashMap;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use tokio::sync::OnceCell;
19use tracing::{info, warn};
20use utoipa::ToSchema;
21
22pub const CURRENT_SCHEMA_VERSION: i32 = 6;
23pub const SESSIONS_FOLDER: &str = "sessions";
24pub const DB_NAME: &str = "sessions.db";
25
26#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, PartialEq, Eq, Default)]
27#[serde(rename_all = "snake_case")]
28pub enum SessionType {
29 #[default]
30 User,
31 Scheduled,
32 SubAgent,
33 Hidden,
34 Terminal,
35}
36
37impl std::fmt::Display for SessionType {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 SessionType::User => write!(f, "user"),
41 SessionType::SubAgent => write!(f, "sub_agent"),
42 SessionType::Hidden => write!(f, "hidden"),
43 SessionType::Scheduled => write!(f, "scheduled"),
44 SessionType::Terminal => write!(f, "terminal"),
45 }
46 }
47}
48
49impl std::str::FromStr for SessionType {
50 type Err = anyhow::Error;
51
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 match s {
54 "user" => Ok(SessionType::User),
55 "sub_agent" => Ok(SessionType::SubAgent),
56 "hidden" => Ok(SessionType::Hidden),
57 "scheduled" => Ok(SessionType::Scheduled),
58 "terminal" => Ok(SessionType::Terminal),
59 _ => Err(anyhow::anyhow!("Invalid session type: {}", s)),
60 }
61 }
62}
63
64static SESSION_STORAGE: OnceCell<Arc<SessionStorage>> = OnceCell::const_new();
65
66#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
67pub struct Session {
68 pub id: String,
69 #[schema(value_type = String)]
70 pub working_dir: PathBuf,
71 #[serde(alias = "description")]
72 pub name: String,
73 #[serde(default)]
74 pub user_set_name: bool,
75 #[serde(default)]
76 pub session_type: SessionType,
77 pub created_at: DateTime<Utc>,
78 pub updated_at: DateTime<Utc>,
79 pub extension_data: ExtensionData,
80 pub total_tokens: Option<i32>,
81 pub input_tokens: Option<i32>,
82 pub output_tokens: Option<i32>,
83 pub accumulated_total_tokens: Option<i32>,
84 pub accumulated_input_tokens: Option<i32>,
85 pub accumulated_output_tokens: Option<i32>,
86 pub schedule_id: Option<String>,
87 pub recipe: Option<Recipe>,
88 pub user_recipe_values: Option<HashMap<String, String>>,
89 pub conversation: Option<Conversation>,
90 pub message_count: usize,
91 pub provider_name: Option<String>,
92 pub model_config: Option<ModelConfig>,
93}
94
95pub struct SessionUpdateBuilder {
96 session_id: String,
97 name: Option<String>,
98 user_set_name: Option<bool>,
99 session_type: Option<SessionType>,
100 working_dir: Option<PathBuf>,
101 extension_data: Option<ExtensionData>,
102 total_tokens: Option<Option<i32>>,
103 input_tokens: Option<Option<i32>>,
104 output_tokens: Option<Option<i32>>,
105 accumulated_total_tokens: Option<Option<i32>>,
106 accumulated_input_tokens: Option<Option<i32>>,
107 accumulated_output_tokens: Option<Option<i32>>,
108 schedule_id: Option<Option<String>>,
109 recipe: Option<Option<Recipe>>,
110 user_recipe_values: Option<Option<HashMap<String, String>>>,
111 provider_name: Option<Option<String>>,
112 model_config: Option<Option<ModelConfig>>,
113}
114
115#[derive(Serialize, ToSchema, Debug)]
116#[serde(rename_all = "camelCase")]
117pub struct SessionInsights {
118 pub total_sessions: usize,
119 pub total_tokens: i64,
120}
121
122impl SessionUpdateBuilder {
123 fn new(session_id: String) -> Self {
124 Self {
125 session_id,
126 name: None,
127 user_set_name: None,
128 session_type: None,
129 working_dir: None,
130 extension_data: None,
131 total_tokens: None,
132 input_tokens: None,
133 output_tokens: None,
134 accumulated_total_tokens: None,
135 accumulated_input_tokens: None,
136 accumulated_output_tokens: None,
137 schedule_id: None,
138 recipe: None,
139 user_recipe_values: None,
140 provider_name: None,
141 model_config: None,
142 }
143 }
144
145 pub fn user_provided_name(mut self, name: impl Into<String>) -> Self {
146 let name = name.into().trim().to_string();
147 if !name.is_empty() {
148 self.name = Some(name);
149 self.user_set_name = Some(true);
150 }
151 self
152 }
153
154 pub fn system_generated_name(mut self, name: impl Into<String>) -> Self {
155 let name = name.into().trim().to_string();
156 if !name.is_empty() {
157 self.name = Some(name);
158 self.user_set_name = Some(false);
159 }
160 self
161 }
162
163 pub fn session_type(mut self, session_type: SessionType) -> Self {
164 self.session_type = Some(session_type);
165 self
166 }
167
168 pub fn working_dir(mut self, working_dir: PathBuf) -> Self {
169 self.working_dir = Some(working_dir);
170 self
171 }
172
173 pub fn extension_data(mut self, data: ExtensionData) -> Self {
174 self.extension_data = Some(data);
175 self
176 }
177
178 pub fn total_tokens(mut self, tokens: Option<i32>) -> Self {
179 self.total_tokens = Some(tokens);
180 self
181 }
182
183 pub fn input_tokens(mut self, tokens: Option<i32>) -> Self {
184 self.input_tokens = Some(tokens);
185 self
186 }
187
188 pub fn output_tokens(mut self, tokens: Option<i32>) -> Self {
189 self.output_tokens = Some(tokens);
190 self
191 }
192
193 pub fn accumulated_total_tokens(mut self, tokens: Option<i32>) -> Self {
194 self.accumulated_total_tokens = Some(tokens);
195 self
196 }
197
198 pub fn accumulated_input_tokens(mut self, tokens: Option<i32>) -> Self {
199 self.accumulated_input_tokens = Some(tokens);
200 self
201 }
202
203 pub fn accumulated_output_tokens(mut self, tokens: Option<i32>) -> Self {
204 self.accumulated_output_tokens = Some(tokens);
205 self
206 }
207
208 pub fn schedule_id(mut self, schedule_id: Option<String>) -> Self {
209 self.schedule_id = Some(schedule_id);
210 self
211 }
212
213 pub fn recipe(mut self, recipe: Option<Recipe>) -> Self {
214 self.recipe = Some(recipe);
215 self
216 }
217
218 pub fn user_recipe_values(
219 mut self,
220 user_recipe_values: Option<HashMap<String, String>>,
221 ) -> Self {
222 self.user_recipe_values = Some(user_recipe_values);
223 self
224 }
225
226 pub fn provider_name(mut self, provider_name: impl Into<String>) -> Self {
227 self.provider_name = Some(Some(provider_name.into()));
228 self
229 }
230
231 pub fn model_config(mut self, model_config: ModelConfig) -> Self {
232 self.model_config = Some(Some(model_config));
233 self
234 }
235
236 pub async fn apply(self) -> Result<()> {
237 SessionManager::apply_update(self).await
238 }
239}
240
241pub struct SessionManager;
242
243impl SessionManager {
244 pub async fn instance() -> Result<Arc<SessionStorage>> {
245 SESSION_STORAGE
246 .get_or_try_init(|| async { SessionStorage::new().await.map(Arc::new) })
247 .await
248 .map(Arc::clone)
249 }
250
251 pub async fn create_session(
252 working_dir: PathBuf,
253 name: String,
254 session_type: SessionType,
255 ) -> Result<Session> {
256 Self::instance()
257 .await?
258 .create_session(working_dir, name, session_type)
259 .await
260 }
261
262 pub async fn get_session(id: &str, include_messages: bool) -> Result<Session> {
263 Self::instance()
264 .await?
265 .get_session(id, include_messages)
266 .await
267 }
268
269 pub fn update_session(id: &str) -> SessionUpdateBuilder {
270 SessionUpdateBuilder::new(id.to_string())
271 }
272
273 async fn apply_update(builder: SessionUpdateBuilder) -> Result<()> {
274 Self::instance().await?.apply_update(builder).await
275 }
276
277 pub async fn add_message(id: &str, message: &Message) -> Result<()> {
278 Self::instance().await?.add_message(id, message).await
279 }
280
281 pub async fn replace_conversation(id: &str, conversation: &Conversation) -> Result<()> {
282 Self::instance()
283 .await?
284 .replace_conversation(id, conversation)
285 .await
286 }
287
288 pub async fn list_sessions() -> Result<Vec<Session>> {
289 Self::instance().await?.list_sessions().await
290 }
291
292 pub async fn list_sessions_by_types(types: &[SessionType]) -> Result<Vec<Session>> {
293 Self::instance().await?.list_sessions_by_types(types).await
294 }
295
296 pub async fn delete_session(id: &str) -> Result<()> {
297 Self::instance().await?.delete_session(id).await
298 }
299
300 pub async fn get_insights() -> Result<SessionInsights> {
301 Self::instance().await?.get_insights().await
302 }
303
304 pub async fn export_session(id: &str) -> Result<String> {
305 Self::instance().await?.export_session(id).await
306 }
307
308 pub async fn import_session(json: &str) -> Result<Session> {
309 Self::instance().await?.import_session(json).await
310 }
311
312 pub async fn copy_session(session_id: &str, new_name: String) -> Result<Session> {
313 Self::instance()
314 .await?
315 .copy_session(session_id, new_name)
316 .await
317 }
318
319 pub async fn truncate_conversation(session_id: &str, timestamp: i64) -> Result<()> {
320 Self::instance()
321 .await?
322 .truncate_conversation(session_id, timestamp)
323 .await
324 }
325
326 pub async fn maybe_update_name(id: &str, provider: Arc<dyn Provider>) -> Result<()> {
327 let session = Self::get_session(id, true).await?;
328
329 if session.user_set_name {
330 return Ok(());
331 }
332
333 let conversation = session
334 .conversation
335 .ok_or_else(|| anyhow::anyhow!("No messages found"))?;
336
337 let user_message_count = conversation
338 .messages()
339 .iter()
340 .filter(|m| matches!(m.role, Role::User))
341 .count();
342
343 if user_message_count <= MSG_COUNT_FOR_SESSION_NAME_GENERATION {
344 let name = provider.generate_session_name(&conversation).await?;
345 Self::update_session(id)
346 .system_generated_name(name)
347 .apply()
348 .await
349 } else {
350 Ok(())
351 }
352 }
353
354 pub async fn search_chat_history(
355 query: &str,
356 limit: Option<usize>,
357 after_date: Option<chrono::DateTime<chrono::Utc>>,
358 before_date: Option<chrono::DateTime<chrono::Utc>>,
359 exclude_session_id: Option<String>,
360 ) -> Result<crate::session::chat_history_search::ChatRecallResults> {
361 Self::instance()
362 .await?
363 .search_chat_history(query, limit, after_date, before_date, exclude_session_id)
364 .await
365 }
366}
367
368pub struct SessionStorage {
369 pool: Pool<Sqlite>,
370}
371
372pub fn ensure_session_dir() -> Result<PathBuf> {
373 let session_dir = Paths::data_dir().join(SESSIONS_FOLDER);
374
375 if !session_dir.exists() {
376 fs::create_dir_all(&session_dir)?;
377 }
378
379 Ok(session_dir)
380}
381
382fn role_to_string(role: &Role) -> &'static str {
383 match role {
384 Role::User => "user",
385 Role::Assistant => "assistant",
386 }
387}
388
389impl Default for Session {
390 fn default() -> Self {
391 Self {
392 id: String::new(),
393 working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
394 name: String::new(),
395 user_set_name: false,
396 session_type: SessionType::default(),
397 created_at: Default::default(),
398 updated_at: Default::default(),
399 extension_data: ExtensionData::default(),
400 total_tokens: None,
401 input_tokens: None,
402 output_tokens: None,
403 accumulated_total_tokens: None,
404 accumulated_input_tokens: None,
405 accumulated_output_tokens: None,
406 schedule_id: None,
407 recipe: None,
408 user_recipe_values: None,
409 conversation: None,
410 message_count: 0,
411 provider_name: None,
412 model_config: None,
413 }
414 }
415}
416
417impl Session {
418 pub fn without_messages(mut self) -> Self {
419 self.conversation = None;
420 self
421 }
422}
423
424impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for Session {
425 fn from_row(row: &sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
426 use sqlx::Row;
427
428 let recipe_json: Option<String> = row.try_get("recipe_json")?;
429 let recipe = recipe_json.and_then(|json| serde_json::from_str(&json).ok());
430
431 let user_recipe_values_json: Option<String> = row.try_get("user_recipe_values_json")?;
432 let user_recipe_values =
433 user_recipe_values_json.and_then(|json| serde_json::from_str(&json).ok());
434
435 let model_config_json: Option<String> = row.try_get("model_config_json").ok().flatten();
436 let model_config = model_config_json.and_then(|json| serde_json::from_str(&json).ok());
437
438 let name: String = {
439 let name_val: String = row.try_get("name").unwrap_or_default();
440 if !name_val.is_empty() {
441 name_val
442 } else {
443 row.try_get("description").unwrap_or_default()
444 }
445 };
446
447 let user_set_name = row.try_get("user_set_name").unwrap_or(false);
448
449 let session_type_str: String = row
450 .try_get("session_type")
451 .unwrap_or_else(|_| "user".to_string());
452 let session_type = session_type_str.parse().unwrap_or_default();
453
454 Ok(Session {
455 id: row.try_get("id")?,
456 working_dir: PathBuf::from(row.try_get::<String, _>("working_dir")?),
457 name,
458 user_set_name,
459 session_type,
460 created_at: row.try_get("created_at")?,
461 updated_at: row.try_get("updated_at")?,
462 extension_data: serde_json::from_str(&row.try_get::<String, _>("extension_data")?)
463 .unwrap_or_default(),
464 total_tokens: row.try_get("total_tokens")?,
465 input_tokens: row.try_get("input_tokens")?,
466 output_tokens: row.try_get("output_tokens")?,
467 accumulated_total_tokens: row.try_get("accumulated_total_tokens")?,
468 accumulated_input_tokens: row.try_get("accumulated_input_tokens")?,
469 accumulated_output_tokens: row.try_get("accumulated_output_tokens")?,
470 schedule_id: row.try_get("schedule_id")?,
471 recipe,
472 user_recipe_values,
473 conversation: None,
474 message_count: row.try_get("message_count").unwrap_or(0) as usize,
475 provider_name: row.try_get("provider_name").ok().flatten(),
476 model_config,
477 })
478 }
479}
480
481impl SessionStorage {
482 async fn new() -> Result<Self> {
483 let session_dir = ensure_session_dir()?;
484 let db_path = session_dir.join(DB_NAME);
485
486 let storage = if db_path.exists() {
487 Self::open(&db_path).await?
488 } else {
489 let storage = Self::create(&db_path).await?;
490
491 if let Err(e) = storage.import_legacy(&session_dir).await {
492 warn!("Failed to import some legacy sessions: {}", e);
493 }
494
495 storage
496 };
497
498 Ok(storage)
499 }
500
501 async fn get_pool(db_path: &Path, create_if_missing: bool) -> Result<Pool<Sqlite>> {
502 let options = SqliteConnectOptions::new()
503 .filename(db_path)
504 .create_if_missing(create_if_missing)
505 .busy_timeout(std::time::Duration::from_secs(5))
506 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
507
508 sqlx::SqlitePool::connect_with(options).await.map_err(|e| {
509 anyhow::anyhow!(
510 "Failed to open SQLite database at '{}': {}",
511 db_path.display(),
512 e
513 )
514 })
515 }
516
517 async fn open(db_path: &Path) -> Result<Self> {
518 let pool = Self::get_pool(db_path, false).await?;
519
520 let storage = Self { pool };
521 storage.run_migrations().await?;
522 Ok(storage)
523 }
524
525 async fn create(db_path: &Path) -> Result<Self> {
526 let pool = Self::get_pool(db_path, true).await?;
527
528 sqlx::query(
529 r#"
530 CREATE TABLE schema_version (
531 version INTEGER PRIMARY KEY,
532 applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
533 )
534 "#,
535 )
536 .execute(&pool)
537 .await?;
538
539 sqlx::query("INSERT INTO schema_version (version) VALUES (?)")
540 .bind(CURRENT_SCHEMA_VERSION)
541 .execute(&pool)
542 .await?;
543
544 sqlx::query(
545 r#"
546 CREATE TABLE sessions (
547 id TEXT PRIMARY KEY,
548 name TEXT NOT NULL DEFAULT '',
549 description TEXT NOT NULL DEFAULT '',
550 user_set_name BOOLEAN DEFAULT FALSE,
551 session_type TEXT NOT NULL DEFAULT 'user',
552 working_dir TEXT NOT NULL,
553 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
554 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
555 extension_data TEXT DEFAULT '{}',
556 total_tokens INTEGER,
557 input_tokens INTEGER,
558 output_tokens INTEGER,
559 accumulated_total_tokens INTEGER,
560 accumulated_input_tokens INTEGER,
561 accumulated_output_tokens INTEGER,
562 schedule_id TEXT,
563 recipe_json TEXT,
564 user_recipe_values_json TEXT,
565 provider_name TEXT,
566 model_config_json TEXT
567 )
568 "#,
569 )
570 .execute(&pool)
571 .await?;
572
573 sqlx::query(
574 r#"
575 CREATE TABLE messages (
576 id INTEGER PRIMARY KEY AUTOINCREMENT,
577 session_id TEXT NOT NULL REFERENCES sessions(id),
578 role TEXT NOT NULL,
579 content_json TEXT NOT NULL,
580 created_timestamp INTEGER NOT NULL,
581 timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
582 tokens INTEGER,
583 metadata_json TEXT
584 )
585 "#,
586 )
587 .execute(&pool)
588 .await?;
589
590 sqlx::query("CREATE INDEX idx_messages_session ON messages(session_id)")
591 .execute(&pool)
592 .await?;
593 sqlx::query("CREATE INDEX idx_messages_timestamp ON messages(timestamp)")
594 .execute(&pool)
595 .await?;
596 sqlx::query("CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC)")
597 .execute(&pool)
598 .await?;
599 sqlx::query("CREATE INDEX idx_sessions_type ON sessions(session_type)")
600 .execute(&pool)
601 .await?;
602
603 Ok(Self { pool })
604 }
605
606 async fn import_legacy(&self, session_dir: &PathBuf) -> Result<()> {
607 use crate::session::legacy;
608
609 let sessions = match legacy::list_sessions(session_dir) {
610 Ok(sessions) => sessions,
611 Err(_) => {
612 warn!("No legacy sessions found to import");
613 return Ok(());
614 }
615 };
616
617 if sessions.is_empty() {
618 return Ok(());
619 }
620
621 let mut imported_count = 0;
622 let mut failed_count = 0;
623
624 for (session_name, session_path) in sessions {
625 match legacy::load_session(&session_name, &session_path) {
626 Ok(session) => match self.import_legacy_session(&session).await {
627 Ok(_) => {
628 imported_count += 1;
629 info!(" ✓ Imported: {}", session_name);
630 }
631 Err(e) => {
632 failed_count += 1;
633 info!(" ✗ Failed to import {}: {}", session_name, e);
634 }
635 },
636 Err(e) => {
637 failed_count += 1;
638 info!(" ✗ Failed to load {}: {}", session_name, e);
639 }
640 }
641 }
642
643 info!(
644 "Import complete: {} successful, {} failed",
645 imported_count, failed_count
646 );
647 Ok(())
648 }
649
650 async fn import_legacy_session(&self, session: &Session) -> Result<()> {
651 let mut tx = self.pool.begin().await?;
652
653 let recipe_json = match &session.recipe {
654 Some(recipe) => Some(serde_json::to_string(recipe)?),
655 None => None,
656 };
657
658 let user_recipe_values_json = match &session.user_recipe_values {
659 Some(user_recipe_values) => Some(serde_json::to_string(user_recipe_values)?),
660 None => None,
661 };
662
663 let model_config_json = match &session.model_config {
664 Some(model_config) => Some(serde_json::to_string(model_config)?),
665 None => None,
666 };
667
668 sqlx::query(
669 r#"
670 INSERT INTO sessions (
671 id, name, user_set_name, session_type, working_dir, created_at, updated_at, extension_data,
672 total_tokens, input_tokens, output_tokens,
673 accumulated_total_tokens, accumulated_input_tokens, accumulated_output_tokens,
674 schedule_id, recipe_json, user_recipe_values_json,
675 provider_name, model_config_json
676 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
677 "#,
678 )
679 .bind(&session.id)
680 .bind(&session.name)
681 .bind(session.user_set_name)
682 .bind(session.session_type.to_string())
683 .bind(session.working_dir.to_string_lossy().as_ref())
684 .bind(session.created_at)
685 .bind(session.updated_at)
686 .bind(serde_json::to_string(&session.extension_data)?)
687 .bind(session.total_tokens)
688 .bind(session.input_tokens)
689 .bind(session.output_tokens)
690 .bind(session.accumulated_total_tokens)
691 .bind(session.accumulated_input_tokens)
692 .bind(session.accumulated_output_tokens)
693 .bind(&session.schedule_id)
694 .bind(recipe_json)
695 .bind(user_recipe_values_json)
696 .bind(&session.provider_name)
697 .bind(model_config_json)
698 .execute(&mut *tx)
699 .await?;
700
701 tx.commit().await?;
702
703 if let Some(conversation) = &session.conversation {
704 self.replace_conversation(&session.id, conversation).await?;
705 }
706 Ok(())
707 }
708
709 async fn run_migrations(&self) -> Result<()> {
710 let current_version = self.get_schema_version().await?;
711
712 if current_version < CURRENT_SCHEMA_VERSION {
713 info!(
714 "Running database migrations from v{} to v{}...",
715 current_version, CURRENT_SCHEMA_VERSION
716 );
717
718 for version in (current_version + 1)..=CURRENT_SCHEMA_VERSION {
719 info!(" Applying migration v{}...", version);
720 self.apply_migration(version).await?;
721 self.update_schema_version(version).await?;
722 info!(" ✓ Migration v{} complete", version);
723 }
724
725 info!("All migrations complete");
726 }
727
728 Ok(())
729 }
730
731 async fn get_schema_version(&self) -> Result<i32> {
732 let table_exists = sqlx::query_scalar::<_, bool>(
733 r#"
734 SELECT EXISTS (
735 SELECT name FROM sqlite_master
736 WHERE type='table' AND name='schema_version'
737 )
738 "#,
739 )
740 .fetch_one(&self.pool)
741 .await?;
742
743 if !table_exists {
744 return Ok(0);
745 }
746
747 let version = sqlx::query_scalar::<_, i32>("SELECT MAX(version) FROM schema_version")
748 .fetch_one(&self.pool)
749 .await?;
750
751 Ok(version)
752 }
753
754 async fn update_schema_version(&self, version: i32) -> Result<()> {
755 sqlx::query("INSERT INTO schema_version (version) VALUES (?)")
756 .bind(version)
757 .execute(&self.pool)
758 .await?;
759 Ok(())
760 }
761
762 async fn apply_migration(&self, version: i32) -> Result<()> {
763 match version {
764 1 => {
765 sqlx::query(
766 r#"
767 CREATE TABLE IF NOT EXISTS schema_version (
768 version INTEGER PRIMARY KEY,
769 applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
770 )
771 "#,
772 )
773 .execute(&self.pool)
774 .await?;
775 }
776 2 => {
777 sqlx::query(
778 r#"
779 ALTER TABLE sessions ADD COLUMN user_recipe_values_json TEXT
780 "#,
781 )
782 .execute(&self.pool)
783 .await?;
784 }
785 3 => {
786 sqlx::query(
787 r#"
788 ALTER TABLE messages ADD COLUMN metadata_json TEXT
789 "#,
790 )
791 .execute(&self.pool)
792 .await?;
793 }
794 4 => {
795 sqlx::query(
796 r#"
797 ALTER TABLE sessions ADD COLUMN name TEXT DEFAULT ''
798 "#,
799 )
800 .execute(&self.pool)
801 .await?;
802
803 sqlx::query(
804 r#"
805 ALTER TABLE sessions ADD COLUMN user_set_name BOOLEAN DEFAULT FALSE
806 "#,
807 )
808 .execute(&self.pool)
809 .await?;
810 }
811 5 => {
812 sqlx::query(
813 r#"
814 ALTER TABLE sessions ADD COLUMN session_type TEXT NOT NULL DEFAULT 'user'
815 "#,
816 )
817 .execute(&self.pool)
818 .await?;
819
820 sqlx::query("CREATE INDEX idx_sessions_type ON sessions(session_type)")
821 .execute(&self.pool)
822 .await?;
823 }
824 6 => {
825 sqlx::query(
826 r#"
827 ALTER TABLE sessions ADD COLUMN provider_name TEXT
828 "#,
829 )
830 .execute(&self.pool)
831 .await?;
832
833 sqlx::query(
834 r#"
835 ALTER TABLE sessions ADD COLUMN model_config_json TEXT
836 "#,
837 )
838 .execute(&self.pool)
839 .await?;
840 }
841 _ => {
842 anyhow::bail!("Unknown migration version: {}", version);
843 }
844 }
845
846 Ok(())
847 }
848
849 async fn create_session(
850 &self,
851 working_dir: PathBuf,
852 name: String,
853 session_type: SessionType,
854 ) -> Result<Session> {
855 let mut tx = self.pool.begin().await?;
856
857 let today = chrono::Utc::now().format("%Y%m%d").to_string();
858 let session = sqlx::query_as(
859 r#"
860 INSERT INTO sessions (id, name, user_set_name, session_type, working_dir, extension_data)
861 VALUES (
862 ? || '_' || CAST(COALESCE((
863 SELECT MAX(CAST(SUBSTR(id, 10) AS INTEGER))
864 FROM sessions
865 WHERE id LIKE ? || '_%'
866 ), 0) + 1 AS TEXT),
867 ?,
868 FALSE,
869 ?,
870 ?,
871 '{}'
872 )
873 RETURNING *
874 "#,
875 )
876 .bind(&today)
877 .bind(&today)
878 .bind(&name)
879 .bind(session_type.to_string())
880 .bind(working_dir.to_string_lossy().as_ref())
881 .fetch_one(&mut *tx)
882 .await?;
883
884 tx.commit().await?;
885 crate::posthog::emit_session_started();
886 Ok(session)
887 }
888
889 async fn get_session(&self, id: &str, include_messages: bool) -> Result<Session> {
890 let mut session = sqlx::query_as::<_, Session>(
891 r#"
892 SELECT id, working_dir, name, description, user_set_name, session_type, created_at, updated_at, extension_data,
893 total_tokens, input_tokens, output_tokens,
894 accumulated_total_tokens, accumulated_input_tokens, accumulated_output_tokens,
895 schedule_id, recipe_json, user_recipe_values_json,
896 provider_name, model_config_json
897 FROM sessions
898 WHERE id = ?
899 "#,
900 )
901 .bind(id)
902 .fetch_optional(&self.pool)
903 .await?
904 .ok_or_else(|| anyhow::anyhow!("Session not found"))?;
905
906 if include_messages {
907 let conv = self.get_conversation(&session.id).await?;
908 session.message_count = conv.messages().len();
909 session.conversation = Some(conv);
910 } else {
911 let count =
912 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM messages WHERE session_id = ?")
913 .bind(&session.id)
914 .fetch_one(&self.pool)
915 .await? as usize;
916 session.message_count = count;
917 }
918
919 Ok(session)
920 }
921
922 #[allow(clippy::too_many_lines)]
923 async fn apply_update(&self, builder: SessionUpdateBuilder) -> Result<()> {
924 let mut updates = Vec::new();
925 let mut query = String::from("UPDATE sessions SET ");
926
927 macro_rules! add_update {
928 ($field:expr, $name:expr) => {
929 if $field.is_some() {
930 if !updates.is_empty() {
931 query.push_str(", ");
932 }
933 updates.push($name);
934 query.push_str($name);
935 query.push_str(" = ?");
936 }
937 };
938 }
939
940 add_update!(builder.name, "name");
941 add_update!(builder.user_set_name, "user_set_name");
942 add_update!(builder.session_type, "session_type");
943 add_update!(builder.working_dir, "working_dir");
944 add_update!(builder.extension_data, "extension_data");
945 add_update!(builder.total_tokens, "total_tokens");
946 add_update!(builder.input_tokens, "input_tokens");
947 add_update!(builder.output_tokens, "output_tokens");
948 add_update!(builder.accumulated_total_tokens, "accumulated_total_tokens");
949 add_update!(builder.accumulated_input_tokens, "accumulated_input_tokens");
950 add_update!(
951 builder.accumulated_output_tokens,
952 "accumulated_output_tokens"
953 );
954 add_update!(builder.schedule_id, "schedule_id");
955 add_update!(builder.recipe, "recipe_json");
956 add_update!(builder.user_recipe_values, "user_recipe_values_json");
957 add_update!(builder.provider_name, "provider_name");
958 add_update!(builder.model_config, "model_config_json");
959
960 if updates.is_empty() {
961 return Ok(());
962 }
963
964 query.push_str(", ");
965 query.push_str("updated_at = datetime('now') WHERE id = ?");
966
967 let mut q = sqlx::query(&query);
968
969 if let Some(name) = builder.name {
970 q = q.bind(name);
971 }
972 if let Some(user_set_name) = builder.user_set_name {
973 q = q.bind(user_set_name);
974 }
975 if let Some(session_type) = builder.session_type {
976 q = q.bind(session_type.to_string());
977 }
978 if let Some(wd) = builder.working_dir {
979 q = q.bind(wd.to_string_lossy().to_string());
980 }
981 if let Some(ed) = builder.extension_data {
982 q = q.bind(serde_json::to_string(&ed)?);
983 }
984 if let Some(tt) = builder.total_tokens {
985 q = q.bind(tt);
986 }
987 if let Some(it) = builder.input_tokens {
988 q = q.bind(it);
989 }
990 if let Some(ot) = builder.output_tokens {
991 q = q.bind(ot);
992 }
993 if let Some(att) = builder.accumulated_total_tokens {
994 q = q.bind(att);
995 }
996 if let Some(ait) = builder.accumulated_input_tokens {
997 q = q.bind(ait);
998 }
999 if let Some(aot) = builder.accumulated_output_tokens {
1000 q = q.bind(aot);
1001 }
1002 if let Some(sid) = builder.schedule_id {
1003 q = q.bind(sid);
1004 }
1005 if let Some(recipe) = builder.recipe {
1006 let recipe_json = recipe.map(|r| serde_json::to_string(&r)).transpose()?;
1007 q = q.bind(recipe_json);
1008 }
1009 if let Some(user_recipe_values) = builder.user_recipe_values {
1010 let user_recipe_values_json = user_recipe_values
1011 .map(|urv| serde_json::to_string(&urv))
1012 .transpose()?;
1013 q = q.bind(user_recipe_values_json);
1014 }
1015 if let Some(provider_name) = builder.provider_name {
1016 q = q.bind(provider_name);
1017 }
1018 if let Some(model_config) = builder.model_config {
1019 let model_config_json = model_config
1020 .map(|mc| serde_json::to_string(&mc))
1021 .transpose()?;
1022 q = q.bind(model_config_json);
1023 }
1024
1025 let mut tx = self.pool.begin().await?;
1026 q = q.bind(&builder.session_id);
1027 q.execute(&mut *tx).await?;
1028
1029 tx.commit().await?;
1030 Ok(())
1031 }
1032
1033 async fn get_conversation(&self, session_id: &str) -> Result<Conversation> {
1034 let rows = sqlx::query_as::<_, (String, String, i64, Option<String>)>(
1035 "SELECT role, content_json, created_timestamp, metadata_json FROM messages WHERE session_id = ? ORDER BY timestamp",
1036 )
1037 .bind(session_id)
1038 .fetch_all(&self.pool)
1039 .await?;
1040
1041 let mut messages = Vec::new();
1042 for (idx, (role_str, content_json, created_timestamp, metadata_json)) in
1043 rows.into_iter().enumerate()
1044 {
1045 let role = match role_str.as_str() {
1046 "user" => Role::User,
1047 "assistant" => Role::Assistant,
1048 _ => continue,
1049 };
1050
1051 let content = serde_json::from_str(&content_json)?;
1052 let metadata = metadata_json
1053 .and_then(|json| serde_json::from_str(&json).ok())
1054 .unwrap_or_default();
1055
1056 let mut message = Message::new(role, created_timestamp, content);
1057 message.metadata = metadata;
1058 message = message.with_id(format!("msg_{}_{}", session_id, idx));
1059 messages.push(message);
1060 }
1061
1062 Ok(Conversation::new_unvalidated(messages))
1063 }
1064
1065 async fn add_message(&self, session_id: &str, message: &Message) -> Result<()> {
1066 let mut tx = self.pool.begin().await?;
1067
1068 let metadata_json = serde_json::to_string(&message.metadata)?;
1069
1070 sqlx::query(
1071 r#"
1072 INSERT INTO messages (session_id, role, content_json, created_timestamp, metadata_json)
1073 VALUES (?, ?, ?, ?, ?)
1074 "#,
1075 )
1076 .bind(session_id)
1077 .bind(role_to_string(&message.role))
1078 .bind(serde_json::to_string(&message.content)?)
1079 .bind(message.created)
1080 .bind(metadata_json)
1081 .execute(&mut *tx)
1082 .await?;
1083
1084 sqlx::query("UPDATE sessions SET updated_at = datetime('now') WHERE id = ?")
1085 .bind(session_id)
1086 .execute(&mut *tx)
1087 .await?;
1088
1089 tx.commit().await?;
1090 Ok(())
1091 }
1092
1093 async fn replace_conversation(
1094 &self,
1095 session_id: &str,
1096 conversation: &Conversation,
1097 ) -> Result<()> {
1098 let mut tx = self.pool.begin().await?;
1099
1100 sqlx::query("DELETE FROM messages WHERE session_id = ?")
1101 .bind(session_id)
1102 .execute(&mut *tx)
1103 .await?;
1104
1105 for message in conversation.messages() {
1106 let metadata_json = serde_json::to_string(&message.metadata)?;
1107
1108 sqlx::query(
1109 r#"
1110 INSERT INTO messages (session_id, role, content_json, created_timestamp, metadata_json)
1111 VALUES (?, ?, ?, ?, ?)
1112 "#,
1113 )
1114 .bind(session_id)
1115 .bind(role_to_string(&message.role))
1116 .bind(serde_json::to_string(&message.content)?)
1117 .bind(message.created)
1118 .bind(metadata_json)
1119 .execute(&mut *tx)
1120 .await?;
1121 }
1122
1123 tx.commit().await?;
1124 Ok(())
1125 }
1126
1127 async fn list_sessions_by_types(&self, types: &[SessionType]) -> Result<Vec<Session>> {
1128 if types.is_empty() {
1129 return Ok(Vec::new());
1130 }
1131
1132 let placeholders: String = types.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
1133 let query = format!(
1134 r#"
1135 SELECT s.id, s.working_dir, s.name, s.description, s.user_set_name, s.session_type, s.created_at, s.updated_at, s.extension_data,
1136 s.total_tokens, s.input_tokens, s.output_tokens,
1137 s.accumulated_total_tokens, s.accumulated_input_tokens, s.accumulated_output_tokens,
1138 s.schedule_id, s.recipe_json, s.user_recipe_values_json,
1139 s.provider_name, s.model_config_json,
1140 COUNT(m.id) as message_count
1141 FROM sessions s
1142 INNER JOIN messages m ON s.id = m.session_id
1143 WHERE s.session_type IN ({})
1144 GROUP BY s.id
1145 ORDER BY s.updated_at DESC
1146 "#,
1147 placeholders
1148 );
1149
1150 let mut q = sqlx::query_as::<_, Session>(&query);
1151 for t in types {
1152 q = q.bind(t.to_string());
1153 }
1154
1155 q.fetch_all(&self.pool).await.map_err(Into::into)
1156 }
1157
1158 async fn list_sessions(&self) -> Result<Vec<Session>> {
1159 self.list_sessions_by_types(&[SessionType::User, SessionType::Scheduled])
1160 .await
1161 }
1162
1163 async fn delete_session(&self, session_id: &str) -> Result<()> {
1164 let mut tx = self.pool.begin().await?;
1165
1166 let exists =
1167 sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM sessions WHERE id = ?)")
1168 .bind(session_id)
1169 .fetch_one(&mut *tx)
1170 .await?;
1171
1172 if !exists {
1173 return Err(anyhow::anyhow!("Session not found"));
1174 }
1175
1176 sqlx::query("DELETE FROM messages WHERE session_id = ?")
1177 .bind(session_id)
1178 .execute(&mut *tx)
1179 .await?;
1180
1181 sqlx::query("DELETE FROM sessions WHERE id = ?")
1182 .bind(session_id)
1183 .execute(&mut *tx)
1184 .await?;
1185
1186 tx.commit().await?;
1187 Ok(())
1188 }
1189
1190 async fn get_insights(&self) -> Result<SessionInsights> {
1191 let row = sqlx::query_as::<_, (i64, Option<i64>)>(
1192 r#"
1193 SELECT COUNT(*) as total_sessions,
1194 COALESCE(SUM(COALESCE(accumulated_total_tokens, total_tokens, 0)), 0) as total_tokens
1195 FROM sessions
1196 "#,
1197 )
1198 .fetch_one(&self.pool)
1199 .await?;
1200
1201 Ok(SessionInsights {
1202 total_sessions: row.0 as usize,
1203 total_tokens: row.1.unwrap_or(0),
1204 })
1205 }
1206
1207 async fn export_session(&self, id: &str) -> Result<String> {
1208 let session = self.get_session(id, true).await?;
1209 serde_json::to_string_pretty(&session).map_err(Into::into)
1210 }
1211
1212 async fn import_session(&self, json: &str) -> Result<Session> {
1213 let import: Session = serde_json::from_str(json)?;
1214
1215 let session = self
1216 .create_session(
1217 import.working_dir.clone(),
1218 import.name.clone(),
1219 import.session_type,
1220 )
1221 .await?;
1222
1223 let mut builder = SessionUpdateBuilder::new(session.id.clone())
1224 .extension_data(import.extension_data)
1225 .total_tokens(import.total_tokens)
1226 .input_tokens(import.input_tokens)
1227 .output_tokens(import.output_tokens)
1228 .accumulated_total_tokens(import.accumulated_total_tokens)
1229 .accumulated_input_tokens(import.accumulated_input_tokens)
1230 .accumulated_output_tokens(import.accumulated_output_tokens)
1231 .schedule_id(import.schedule_id)
1232 .recipe(import.recipe)
1233 .user_recipe_values(import.user_recipe_values);
1234
1235 if import.user_set_name {
1236 builder = builder.user_provided_name(import.name.clone());
1237 }
1238
1239 self.apply_update(builder).await?;
1240
1241 if let Some(conversation) = import.conversation {
1242 self.replace_conversation(&session.id, &conversation)
1243 .await?;
1244 }
1245
1246 self.get_session(&session.id, true).await
1247 }
1248
1249 async fn copy_session(&self, session_id: &str, new_name: String) -> Result<Session> {
1250 let original_session = self.get_session(session_id, true).await?;
1251
1252 let new_session = self
1253 .create_session(
1254 original_session.working_dir.clone(),
1255 new_name,
1256 original_session.session_type,
1257 )
1258 .await?;
1259
1260 let builder = SessionUpdateBuilder::new(new_session.id.clone())
1261 .extension_data(original_session.extension_data)
1262 .schedule_id(original_session.schedule_id)
1263 .recipe(original_session.recipe)
1264 .user_recipe_values(original_session.user_recipe_values);
1265
1266 self.apply_update(builder).await?;
1267
1268 if let Some(conversation) = original_session.conversation {
1269 self.replace_conversation(&new_session.id, &conversation)
1270 .await?;
1271 }
1272
1273 self.get_session(&new_session.id, true).await
1274 }
1275
1276 async fn truncate_conversation(&self, session_id: &str, timestamp: i64) -> Result<()> {
1277 sqlx::query("DELETE FROM messages WHERE session_id = ? AND created_timestamp >= ?")
1278 .bind(session_id)
1279 .bind(timestamp)
1280 .execute(&self.pool)
1281 .await?;
1282
1283 Ok(())
1284 }
1285
1286 async fn search_chat_history(
1287 &self,
1288 query: &str,
1289 limit: Option<usize>,
1290 after_date: Option<chrono::DateTime<chrono::Utc>>,
1291 before_date: Option<chrono::DateTime<chrono::Utc>>,
1292 exclude_session_id: Option<String>,
1293 ) -> Result<crate::session::chat_history_search::ChatRecallResults> {
1294 use crate::session::chat_history_search::ChatHistorySearch;
1295
1296 ChatHistorySearch::new(
1297 &self.pool,
1298 query,
1299 limit,
1300 after_date,
1301 before_date,
1302 exclude_session_id,
1303 )
1304 .execute()
1305 .await
1306 }
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311 use super::*;
1312 use crate::conversation::message::{Message, MessageContent};
1313 use tempfile::TempDir;
1314
1315 const NUM_SESSIONS: i32 = 10;
1316
1317 #[tokio::test]
1318 async fn test_concurrent_session_creation() {
1319 let temp_dir = TempDir::new().unwrap();
1320 let db_path = temp_dir.path().join("test_sessions.db");
1321
1322 let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1323
1324 let mut results = vec![];
1326 for i in 0..NUM_SESSIONS {
1327 let working_dir = PathBuf::from(format!("/tmp/test_{}", i));
1328 let description = format!("Test session {}", i);
1329
1330 let session = storage
1331 .create_session(working_dir.clone(), description, SessionType::User)
1332 .await
1333 .unwrap();
1334
1335 storage
1336 .add_message(
1337 &session.id,
1338 &Message {
1339 id: None,
1340 role: Role::User,
1341 created: chrono::Utc::now().timestamp_millis(),
1342 content: vec![MessageContent::text("hello world")],
1343 metadata: Default::default(),
1344 },
1345 )
1346 .await
1347 .unwrap();
1348
1349 storage
1350 .add_message(
1351 &session.id,
1352 &Message {
1353 id: None,
1354 role: Role::Assistant,
1355 created: chrono::Utc::now().timestamp_millis(),
1356 content: vec![MessageContent::text("sup world?")],
1357 metadata: Default::default(),
1358 },
1359 )
1360 .await
1361 .unwrap();
1362
1363 storage
1364 .apply_update(
1365 SessionUpdateBuilder::new(session.id.clone())
1366 .user_provided_name(format!("Updated session {}", i))
1367 .total_tokens(Some(100 * i)),
1368 )
1369 .await
1370 .unwrap();
1371
1372 let updated = storage.get_session(&session.id, true).await.unwrap();
1373 assert_eq!(updated.message_count, 2);
1374 assert_eq!(updated.total_tokens, Some(100 * i));
1375
1376 results.push(session.id);
1377 }
1378
1379 assert_eq!(results.len(), NUM_SESSIONS as usize);
1380
1381 let unique_ids: std::collections::HashSet<_> = results.iter().collect();
1382 assert_eq!(unique_ids.len(), NUM_SESSIONS as usize);
1383
1384 let sessions = storage.list_sessions().await.unwrap();
1385 assert_eq!(sessions.len(), NUM_SESSIONS as usize);
1386
1387 for session in &sessions {
1388 assert_eq!(session.message_count, 2);
1389 assert!(session.name.starts_with("Updated session"));
1390 }
1391
1392 let insights = storage.get_insights().await.unwrap();
1393 assert_eq!(insights.total_sessions, NUM_SESSIONS as usize);
1394 let expected_tokens = 100 * NUM_SESSIONS * (NUM_SESSIONS - 1) / 2;
1395 assert_eq!(insights.total_tokens, expected_tokens as i64);
1396 }
1397
1398 #[tokio::test]
1399 async fn test_export_import_roundtrip() {
1400 const DESCRIPTION: &str = "Original session";
1401 const TOTAL_TOKENS: i32 = 500;
1402 const INPUT_TOKENS: i32 = 300;
1403 const OUTPUT_TOKENS: i32 = 200;
1404 const ACCUMULATED_TOKENS: i32 = 1000;
1405 const USER_MESSAGE: &str = "test message";
1406 const ASSISTANT_MESSAGE: &str = "test response";
1407
1408 let temp_dir = TempDir::new().unwrap();
1409 let db_path = temp_dir.path().join("test_export.db");
1410 let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1411
1412 let original = storage
1413 .create_session(
1414 PathBuf::from("/tmp/test"),
1415 DESCRIPTION.to_string(),
1416 SessionType::User,
1417 )
1418 .await
1419 .unwrap();
1420
1421 storage
1422 .apply_update(
1423 SessionUpdateBuilder::new(original.id.clone())
1424 .total_tokens(Some(TOTAL_TOKENS))
1425 .input_tokens(Some(INPUT_TOKENS))
1426 .output_tokens(Some(OUTPUT_TOKENS))
1427 .accumulated_total_tokens(Some(ACCUMULATED_TOKENS)),
1428 )
1429 .await
1430 .unwrap();
1431
1432 storage
1433 .add_message(
1434 &original.id,
1435 &Message {
1436 id: None,
1437 role: Role::User,
1438 created: chrono::Utc::now().timestamp_millis(),
1439 content: vec![MessageContent::text(USER_MESSAGE)],
1440 metadata: Default::default(),
1441 },
1442 )
1443 .await
1444 .unwrap();
1445
1446 storage
1447 .add_message(
1448 &original.id,
1449 &Message {
1450 id: None,
1451 role: Role::Assistant,
1452 created: chrono::Utc::now().timestamp_millis(),
1453 content: vec![MessageContent::text(ASSISTANT_MESSAGE)],
1454 metadata: Default::default(),
1455 },
1456 )
1457 .await
1458 .unwrap();
1459
1460 let exported = storage.export_session(&original.id).await.unwrap();
1461 let imported = storage.import_session(&exported).await.unwrap();
1462
1463 assert_ne!(imported.id, original.id);
1464 assert_eq!(imported.name, DESCRIPTION);
1465 assert_eq!(imported.working_dir, PathBuf::from("/tmp/test"));
1466 assert_eq!(imported.total_tokens, Some(TOTAL_TOKENS));
1467 assert_eq!(imported.input_tokens, Some(INPUT_TOKENS));
1468 assert_eq!(imported.output_tokens, Some(OUTPUT_TOKENS));
1469 assert_eq!(imported.accumulated_total_tokens, Some(ACCUMULATED_TOKENS));
1470 assert_eq!(imported.message_count, 2);
1471
1472 let conversation = imported.conversation.unwrap();
1473 assert_eq!(conversation.messages().len(), 2);
1474 assert_eq!(conversation.messages()[0].role, Role::User);
1475 assert_eq!(conversation.messages()[1].role, Role::Assistant);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_import_session_with_description_field() {
1480 const OLD_FORMAT_JSON: &str = r#"{
1481 "id": "20240101_1",
1482 "description": "Old format session",
1483 "user_set_name": true,
1484 "working_dir": "/tmp/test",
1485 "created_at": "2024-01-01T00:00:00Z",
1486 "updated_at": "2024-01-01T00:00:00Z",
1487 "extension_data": {},
1488 "message_count": 0
1489 }"#;
1490
1491 let temp_dir = TempDir::new().unwrap();
1492 let db_path = temp_dir.path().join("test_import.db");
1493 let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1494
1495 let imported = storage.import_session(OLD_FORMAT_JSON).await.unwrap();
1496
1497 assert_eq!(imported.name, "Old format session");
1498 assert!(imported.user_set_name);
1499 assert_eq!(imported.working_dir, PathBuf::from("/tmp/test"));
1500 }
1501}