Skip to main content

aster/session/
session_manager.rs

1use crate::config::paths::Paths;
2use crate::conversation::message::Message;
3use crate::conversation::Conversation;
4use crate::model::ModelConfig;
5use crate::providers::base::{Provider, MSG_COUNT_FOR_SESSION_NAME_GENERATION};
6use crate::recipe::Recipe;
7use crate::session::extension_data::ExtensionData;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use rmcp::model::Role;
11use serde::{Deserialize, Serialize};
12use sqlx::sqlite::SqliteConnectOptions;
13use sqlx::{Pool, Sqlite};
14use std::collections::HashMap;
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use tokio::sync::OnceCell;
19use tracing::{info, warn};
20use utoipa::ToSchema;
21
22pub const CURRENT_SCHEMA_VERSION: i32 = 6;
23pub const SESSIONS_FOLDER: &str = "sessions";
24pub const DB_NAME: &str = "sessions.db";
25
26#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, PartialEq, Eq, Default)]
27#[serde(rename_all = "snake_case")]
28pub enum SessionType {
29    #[default]
30    User,
31    Scheduled,
32    SubAgent,
33    Hidden,
34    Terminal,
35}
36
37impl std::fmt::Display for SessionType {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            SessionType::User => write!(f, "user"),
41            SessionType::SubAgent => write!(f, "sub_agent"),
42            SessionType::Hidden => write!(f, "hidden"),
43            SessionType::Scheduled => write!(f, "scheduled"),
44            SessionType::Terminal => write!(f, "terminal"),
45        }
46    }
47}
48
49impl std::str::FromStr for SessionType {
50    type Err = anyhow::Error;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s {
54            "user" => Ok(SessionType::User),
55            "sub_agent" => Ok(SessionType::SubAgent),
56            "hidden" => Ok(SessionType::Hidden),
57            "scheduled" => Ok(SessionType::Scheduled),
58            "terminal" => Ok(SessionType::Terminal),
59            _ => Err(anyhow::anyhow!("Invalid session type: {}", s)),
60        }
61    }
62}
63
64static SESSION_STORAGE: OnceCell<Arc<SessionStorage>> = OnceCell::const_new();
65
66#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
67pub struct Session {
68    pub id: String,
69    #[schema(value_type = String)]
70    pub working_dir: PathBuf,
71    #[serde(alias = "description")]
72    pub name: String,
73    #[serde(default)]
74    pub user_set_name: bool,
75    #[serde(default)]
76    pub session_type: SessionType,
77    pub created_at: DateTime<Utc>,
78    pub updated_at: DateTime<Utc>,
79    pub extension_data: ExtensionData,
80    pub total_tokens: Option<i32>,
81    pub input_tokens: Option<i32>,
82    pub output_tokens: Option<i32>,
83    pub accumulated_total_tokens: Option<i32>,
84    pub accumulated_input_tokens: Option<i32>,
85    pub accumulated_output_tokens: Option<i32>,
86    pub schedule_id: Option<String>,
87    pub recipe: Option<Recipe>,
88    pub user_recipe_values: Option<HashMap<String, String>>,
89    pub conversation: Option<Conversation>,
90    pub message_count: usize,
91    pub provider_name: Option<String>,
92    pub model_config: Option<ModelConfig>,
93}
94
95pub struct SessionUpdateBuilder {
96    session_id: String,
97    name: Option<String>,
98    user_set_name: Option<bool>,
99    session_type: Option<SessionType>,
100    working_dir: Option<PathBuf>,
101    extension_data: Option<ExtensionData>,
102    total_tokens: Option<Option<i32>>,
103    input_tokens: Option<Option<i32>>,
104    output_tokens: Option<Option<i32>>,
105    accumulated_total_tokens: Option<Option<i32>>,
106    accumulated_input_tokens: Option<Option<i32>>,
107    accumulated_output_tokens: Option<Option<i32>>,
108    schedule_id: Option<Option<String>>,
109    recipe: Option<Option<Recipe>>,
110    user_recipe_values: Option<Option<HashMap<String, String>>>,
111    provider_name: Option<Option<String>>,
112    model_config: Option<Option<ModelConfig>>,
113}
114
115#[derive(Serialize, ToSchema, Debug)]
116#[serde(rename_all = "camelCase")]
117pub struct SessionInsights {
118    pub total_sessions: usize,
119    pub total_tokens: i64,
120}
121
122impl SessionUpdateBuilder {
123    fn new(session_id: String) -> Self {
124        Self {
125            session_id,
126            name: None,
127            user_set_name: None,
128            session_type: None,
129            working_dir: None,
130            extension_data: None,
131            total_tokens: None,
132            input_tokens: None,
133            output_tokens: None,
134            accumulated_total_tokens: None,
135            accumulated_input_tokens: None,
136            accumulated_output_tokens: None,
137            schedule_id: None,
138            recipe: None,
139            user_recipe_values: None,
140            provider_name: None,
141            model_config: None,
142        }
143    }
144
145    pub fn user_provided_name(mut self, name: impl Into<String>) -> Self {
146        let name = name.into().trim().to_string();
147        if !name.is_empty() {
148            self.name = Some(name);
149            self.user_set_name = Some(true);
150        }
151        self
152    }
153
154    pub fn system_generated_name(mut self, name: impl Into<String>) -> Self {
155        let name = name.into().trim().to_string();
156        if !name.is_empty() {
157            self.name = Some(name);
158            self.user_set_name = Some(false);
159        }
160        self
161    }
162
163    pub fn session_type(mut self, session_type: SessionType) -> Self {
164        self.session_type = Some(session_type);
165        self
166    }
167
168    pub fn working_dir(mut self, working_dir: PathBuf) -> Self {
169        self.working_dir = Some(working_dir);
170        self
171    }
172
173    pub fn extension_data(mut self, data: ExtensionData) -> Self {
174        self.extension_data = Some(data);
175        self
176    }
177
178    pub fn total_tokens(mut self, tokens: Option<i32>) -> Self {
179        self.total_tokens = Some(tokens);
180        self
181    }
182
183    pub fn input_tokens(mut self, tokens: Option<i32>) -> Self {
184        self.input_tokens = Some(tokens);
185        self
186    }
187
188    pub fn output_tokens(mut self, tokens: Option<i32>) -> Self {
189        self.output_tokens = Some(tokens);
190        self
191    }
192
193    pub fn accumulated_total_tokens(mut self, tokens: Option<i32>) -> Self {
194        self.accumulated_total_tokens = Some(tokens);
195        self
196    }
197
198    pub fn accumulated_input_tokens(mut self, tokens: Option<i32>) -> Self {
199        self.accumulated_input_tokens = Some(tokens);
200        self
201    }
202
203    pub fn accumulated_output_tokens(mut self, tokens: Option<i32>) -> Self {
204        self.accumulated_output_tokens = Some(tokens);
205        self
206    }
207
208    pub fn schedule_id(mut self, schedule_id: Option<String>) -> Self {
209        self.schedule_id = Some(schedule_id);
210        self
211    }
212
213    pub fn recipe(mut self, recipe: Option<Recipe>) -> Self {
214        self.recipe = Some(recipe);
215        self
216    }
217
218    pub fn user_recipe_values(
219        mut self,
220        user_recipe_values: Option<HashMap<String, String>>,
221    ) -> Self {
222        self.user_recipe_values = Some(user_recipe_values);
223        self
224    }
225
226    pub fn provider_name(mut self, provider_name: impl Into<String>) -> Self {
227        self.provider_name = Some(Some(provider_name.into()));
228        self
229    }
230
231    pub fn model_config(mut self, model_config: ModelConfig) -> Self {
232        self.model_config = Some(Some(model_config));
233        self
234    }
235
236    pub async fn apply(self) -> Result<()> {
237        SessionManager::apply_update(self).await
238    }
239}
240
241pub struct SessionManager;
242
243impl SessionManager {
244    pub async fn instance() -> Result<Arc<SessionStorage>> {
245        SESSION_STORAGE
246            .get_or_try_init(|| async { SessionStorage::new().await.map(Arc::new) })
247            .await
248            .map(Arc::clone)
249    }
250
251    pub async fn create_session(
252        working_dir: PathBuf,
253        name: String,
254        session_type: SessionType,
255    ) -> Result<Session> {
256        Self::instance()
257            .await?
258            .create_session(working_dir, name, session_type)
259            .await
260    }
261
262    pub async fn get_session(id: &str, include_messages: bool) -> Result<Session> {
263        Self::instance()
264            .await?
265            .get_session(id, include_messages)
266            .await
267    }
268
269    pub fn update_session(id: &str) -> SessionUpdateBuilder {
270        SessionUpdateBuilder::new(id.to_string())
271    }
272
273    async fn apply_update(builder: SessionUpdateBuilder) -> Result<()> {
274        Self::instance().await?.apply_update(builder).await
275    }
276
277    pub async fn add_message(id: &str, message: &Message) -> Result<()> {
278        Self::instance().await?.add_message(id, message).await
279    }
280
281    pub async fn replace_conversation(id: &str, conversation: &Conversation) -> Result<()> {
282        Self::instance()
283            .await?
284            .replace_conversation(id, conversation)
285            .await
286    }
287
288    pub async fn list_sessions() -> Result<Vec<Session>> {
289        Self::instance().await?.list_sessions().await
290    }
291
292    pub async fn list_sessions_by_types(types: &[SessionType]) -> Result<Vec<Session>> {
293        Self::instance().await?.list_sessions_by_types(types).await
294    }
295
296    pub async fn delete_session(id: &str) -> Result<()> {
297        Self::instance().await?.delete_session(id).await
298    }
299
300    pub async fn get_insights() -> Result<SessionInsights> {
301        Self::instance().await?.get_insights().await
302    }
303
304    pub async fn export_session(id: &str) -> Result<String> {
305        Self::instance().await?.export_session(id).await
306    }
307
308    pub async fn import_session(json: &str) -> Result<Session> {
309        Self::instance().await?.import_session(json).await
310    }
311
312    pub async fn copy_session(session_id: &str, new_name: String) -> Result<Session> {
313        Self::instance()
314            .await?
315            .copy_session(session_id, new_name)
316            .await
317    }
318
319    pub async fn truncate_conversation(session_id: &str, timestamp: i64) -> Result<()> {
320        Self::instance()
321            .await?
322            .truncate_conversation(session_id, timestamp)
323            .await
324    }
325
326    pub async fn maybe_update_name(id: &str, provider: Arc<dyn Provider>) -> Result<()> {
327        let session = Self::get_session(id, true).await?;
328
329        if session.user_set_name {
330            return Ok(());
331        }
332
333        let conversation = session
334            .conversation
335            .ok_or_else(|| anyhow::anyhow!("No messages found"))?;
336
337        let user_message_count = conversation
338            .messages()
339            .iter()
340            .filter(|m| matches!(m.role, Role::User))
341            .count();
342
343        if user_message_count <= MSG_COUNT_FOR_SESSION_NAME_GENERATION {
344            let name = provider.generate_session_name(&conversation).await?;
345            Self::update_session(id)
346                .system_generated_name(name)
347                .apply()
348                .await
349        } else {
350            Ok(())
351        }
352    }
353
354    pub async fn search_chat_history(
355        query: &str,
356        limit: Option<usize>,
357        after_date: Option<chrono::DateTime<chrono::Utc>>,
358        before_date: Option<chrono::DateTime<chrono::Utc>>,
359        exclude_session_id: Option<String>,
360    ) -> Result<crate::session::chat_history_search::ChatRecallResults> {
361        Self::instance()
362            .await?
363            .search_chat_history(query, limit, after_date, before_date, exclude_session_id)
364            .await
365    }
366}
367
368pub struct SessionStorage {
369    pool: Pool<Sqlite>,
370}
371
372pub fn ensure_session_dir() -> Result<PathBuf> {
373    let session_dir = Paths::data_dir().join(SESSIONS_FOLDER);
374
375    if !session_dir.exists() {
376        fs::create_dir_all(&session_dir)?;
377    }
378
379    Ok(session_dir)
380}
381
382fn role_to_string(role: &Role) -> &'static str {
383    match role {
384        Role::User => "user",
385        Role::Assistant => "assistant",
386    }
387}
388
389impl Default for Session {
390    fn default() -> Self {
391        Self {
392            id: String::new(),
393            working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
394            name: String::new(),
395            user_set_name: false,
396            session_type: SessionType::default(),
397            created_at: Default::default(),
398            updated_at: Default::default(),
399            extension_data: ExtensionData::default(),
400            total_tokens: None,
401            input_tokens: None,
402            output_tokens: None,
403            accumulated_total_tokens: None,
404            accumulated_input_tokens: None,
405            accumulated_output_tokens: None,
406            schedule_id: None,
407            recipe: None,
408            user_recipe_values: None,
409            conversation: None,
410            message_count: 0,
411            provider_name: None,
412            model_config: None,
413        }
414    }
415}
416
417impl Session {
418    pub fn without_messages(mut self) -> Self {
419        self.conversation = None;
420        self
421    }
422}
423
424impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for Session {
425    fn from_row(row: &sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
426        use sqlx::Row;
427
428        let recipe_json: Option<String> = row.try_get("recipe_json")?;
429        let recipe = recipe_json.and_then(|json| serde_json::from_str(&json).ok());
430
431        let user_recipe_values_json: Option<String> = row.try_get("user_recipe_values_json")?;
432        let user_recipe_values =
433            user_recipe_values_json.and_then(|json| serde_json::from_str(&json).ok());
434
435        let model_config_json: Option<String> = row.try_get("model_config_json").ok().flatten();
436        let model_config = model_config_json.and_then(|json| serde_json::from_str(&json).ok());
437
438        let name: String = {
439            let name_val: String = row.try_get("name").unwrap_or_default();
440            if !name_val.is_empty() {
441                name_val
442            } else {
443                row.try_get("description").unwrap_or_default()
444            }
445        };
446
447        let user_set_name = row.try_get("user_set_name").unwrap_or(false);
448
449        let session_type_str: String = row
450            .try_get("session_type")
451            .unwrap_or_else(|_| "user".to_string());
452        let session_type = session_type_str.parse().unwrap_or_default();
453
454        Ok(Session {
455            id: row.try_get("id")?,
456            working_dir: PathBuf::from(row.try_get::<String, _>("working_dir")?),
457            name,
458            user_set_name,
459            session_type,
460            created_at: row.try_get("created_at")?,
461            updated_at: row.try_get("updated_at")?,
462            extension_data: serde_json::from_str(&row.try_get::<String, _>("extension_data")?)
463                .unwrap_or_default(),
464            total_tokens: row.try_get("total_tokens")?,
465            input_tokens: row.try_get("input_tokens")?,
466            output_tokens: row.try_get("output_tokens")?,
467            accumulated_total_tokens: row.try_get("accumulated_total_tokens")?,
468            accumulated_input_tokens: row.try_get("accumulated_input_tokens")?,
469            accumulated_output_tokens: row.try_get("accumulated_output_tokens")?,
470            schedule_id: row.try_get("schedule_id")?,
471            recipe,
472            user_recipe_values,
473            conversation: None,
474            message_count: row.try_get("message_count").unwrap_or(0) as usize,
475            provider_name: row.try_get("provider_name").ok().flatten(),
476            model_config,
477        })
478    }
479}
480
481impl SessionStorage {
482    async fn new() -> Result<Self> {
483        let session_dir = ensure_session_dir()?;
484        let db_path = session_dir.join(DB_NAME);
485
486        let storage = if db_path.exists() {
487            Self::open(&db_path).await?
488        } else {
489            let storage = Self::create(&db_path).await?;
490
491            if let Err(e) = storage.import_legacy(&session_dir).await {
492                warn!("Failed to import some legacy sessions: {}", e);
493            }
494
495            storage
496        };
497
498        Ok(storage)
499    }
500
501    async fn get_pool(db_path: &Path, create_if_missing: bool) -> Result<Pool<Sqlite>> {
502        let options = SqliteConnectOptions::new()
503            .filename(db_path)
504            .create_if_missing(create_if_missing)
505            .busy_timeout(std::time::Duration::from_secs(5))
506            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
507
508        sqlx::SqlitePool::connect_with(options).await.map_err(|e| {
509            anyhow::anyhow!(
510                "Failed to open SQLite database at '{}': {}",
511                db_path.display(),
512                e
513            )
514        })
515    }
516
517    async fn open(db_path: &Path) -> Result<Self> {
518        let pool = Self::get_pool(db_path, false).await?;
519
520        let storage = Self { pool };
521        storage.run_migrations().await?;
522        Ok(storage)
523    }
524
525    async fn create(db_path: &Path) -> Result<Self> {
526        let pool = Self::get_pool(db_path, true).await?;
527
528        sqlx::query(
529            r#"
530            CREATE TABLE schema_version (
531                version INTEGER PRIMARY KEY,
532                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
533            )
534        "#,
535        )
536        .execute(&pool)
537        .await?;
538
539        sqlx::query("INSERT INTO schema_version (version) VALUES (?)")
540            .bind(CURRENT_SCHEMA_VERSION)
541            .execute(&pool)
542            .await?;
543
544        sqlx::query(
545            r#"
546            CREATE TABLE sessions (
547                id TEXT PRIMARY KEY,
548                name TEXT NOT NULL DEFAULT '',
549                description TEXT NOT NULL DEFAULT '',
550                user_set_name BOOLEAN DEFAULT FALSE,
551                session_type TEXT NOT NULL DEFAULT 'user',
552                working_dir TEXT NOT NULL,
553                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
554                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
555                extension_data TEXT DEFAULT '{}',
556                total_tokens INTEGER,
557                input_tokens INTEGER,
558                output_tokens INTEGER,
559                accumulated_total_tokens INTEGER,
560                accumulated_input_tokens INTEGER,
561                accumulated_output_tokens INTEGER,
562                schedule_id TEXT,
563                recipe_json TEXT,
564                user_recipe_values_json TEXT,
565                provider_name TEXT,
566                model_config_json TEXT
567            )
568        "#,
569        )
570        .execute(&pool)
571        .await?;
572
573        sqlx::query(
574            r#"
575            CREATE TABLE messages (
576                id INTEGER PRIMARY KEY AUTOINCREMENT,
577                session_id TEXT NOT NULL REFERENCES sessions(id),
578                role TEXT NOT NULL,
579                content_json TEXT NOT NULL,
580                created_timestamp INTEGER NOT NULL,
581                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
582                tokens INTEGER,
583                metadata_json TEXT
584            )
585        "#,
586        )
587        .execute(&pool)
588        .await?;
589
590        sqlx::query("CREATE INDEX idx_messages_session ON messages(session_id)")
591            .execute(&pool)
592            .await?;
593        sqlx::query("CREATE INDEX idx_messages_timestamp ON messages(timestamp)")
594            .execute(&pool)
595            .await?;
596        sqlx::query("CREATE INDEX idx_sessions_updated ON sessions(updated_at DESC)")
597            .execute(&pool)
598            .await?;
599        sqlx::query("CREATE INDEX idx_sessions_type ON sessions(session_type)")
600            .execute(&pool)
601            .await?;
602
603        Ok(Self { pool })
604    }
605
606    async fn import_legacy(&self, session_dir: &PathBuf) -> Result<()> {
607        use crate::session::legacy;
608
609        let sessions = match legacy::list_sessions(session_dir) {
610            Ok(sessions) => sessions,
611            Err(_) => {
612                warn!("No legacy sessions found to import");
613                return Ok(());
614            }
615        };
616
617        if sessions.is_empty() {
618            return Ok(());
619        }
620
621        let mut imported_count = 0;
622        let mut failed_count = 0;
623
624        for (session_name, session_path) in sessions {
625            match legacy::load_session(&session_name, &session_path) {
626                Ok(session) => match self.import_legacy_session(&session).await {
627                    Ok(_) => {
628                        imported_count += 1;
629                        info!("  ✓ Imported: {}", session_name);
630                    }
631                    Err(e) => {
632                        failed_count += 1;
633                        info!("  ✗ Failed to import {}: {}", session_name, e);
634                    }
635                },
636                Err(e) => {
637                    failed_count += 1;
638                    info!("  ✗ Failed to load {}: {}", session_name, e);
639                }
640            }
641        }
642
643        info!(
644            "Import complete: {} successful, {} failed",
645            imported_count, failed_count
646        );
647        Ok(())
648    }
649
650    async fn import_legacy_session(&self, session: &Session) -> Result<()> {
651        let mut tx = self.pool.begin().await?;
652
653        let recipe_json = match &session.recipe {
654            Some(recipe) => Some(serde_json::to_string(recipe)?),
655            None => None,
656        };
657
658        let user_recipe_values_json = match &session.user_recipe_values {
659            Some(user_recipe_values) => Some(serde_json::to_string(user_recipe_values)?),
660            None => None,
661        };
662
663        let model_config_json = match &session.model_config {
664            Some(model_config) => Some(serde_json::to_string(model_config)?),
665            None => None,
666        };
667
668        sqlx::query(
669            r#"
670        INSERT INTO sessions (
671            id, name, user_set_name, session_type, working_dir, created_at, updated_at, extension_data,
672            total_tokens, input_tokens, output_tokens,
673            accumulated_total_tokens, accumulated_input_tokens, accumulated_output_tokens,
674            schedule_id, recipe_json, user_recipe_values_json,
675            provider_name, model_config_json
676        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
677        "#,
678        )
679            .bind(&session.id)
680            .bind(&session.name)
681            .bind(session.user_set_name)
682            .bind(session.session_type.to_string())
683            .bind(session.working_dir.to_string_lossy().as_ref())
684            .bind(session.created_at)
685            .bind(session.updated_at)
686            .bind(serde_json::to_string(&session.extension_data)?)
687            .bind(session.total_tokens)
688            .bind(session.input_tokens)
689            .bind(session.output_tokens)
690            .bind(session.accumulated_total_tokens)
691            .bind(session.accumulated_input_tokens)
692            .bind(session.accumulated_output_tokens)
693            .bind(&session.schedule_id)
694            .bind(recipe_json)
695            .bind(user_recipe_values_json)
696            .bind(&session.provider_name)
697            .bind(model_config_json)
698            .execute(&mut *tx)
699            .await?;
700
701        tx.commit().await?;
702
703        if let Some(conversation) = &session.conversation {
704            self.replace_conversation(&session.id, conversation).await?;
705        }
706        Ok(())
707    }
708
709    async fn run_migrations(&self) -> Result<()> {
710        let current_version = self.get_schema_version().await?;
711
712        if current_version < CURRENT_SCHEMA_VERSION {
713            info!(
714                "Running database migrations from v{} to v{}...",
715                current_version, CURRENT_SCHEMA_VERSION
716            );
717
718            for version in (current_version + 1)..=CURRENT_SCHEMA_VERSION {
719                info!("  Applying migration v{}...", version);
720                self.apply_migration(version).await?;
721                self.update_schema_version(version).await?;
722                info!("  ✓ Migration v{} complete", version);
723            }
724
725            info!("All migrations complete");
726        }
727
728        Ok(())
729    }
730
731    async fn get_schema_version(&self) -> Result<i32> {
732        let table_exists = sqlx::query_scalar::<_, bool>(
733            r#"
734            SELECT EXISTS (
735                SELECT name FROM sqlite_master
736                WHERE type='table' AND name='schema_version'
737            )
738        "#,
739        )
740        .fetch_one(&self.pool)
741        .await?;
742
743        if !table_exists {
744            return Ok(0);
745        }
746
747        let version = sqlx::query_scalar::<_, i32>("SELECT MAX(version) FROM schema_version")
748            .fetch_one(&self.pool)
749            .await?;
750
751        Ok(version)
752    }
753
754    async fn update_schema_version(&self, version: i32) -> Result<()> {
755        sqlx::query("INSERT INTO schema_version (version) VALUES (?)")
756            .bind(version)
757            .execute(&self.pool)
758            .await?;
759        Ok(())
760    }
761
762    async fn apply_migration(&self, version: i32) -> Result<()> {
763        match version {
764            1 => {
765                sqlx::query(
766                    r#"
767                    CREATE TABLE IF NOT EXISTS schema_version (
768                        version INTEGER PRIMARY KEY,
769                        applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
770                    )
771                "#,
772                )
773                .execute(&self.pool)
774                .await?;
775            }
776            2 => {
777                sqlx::query(
778                    r#"
779                    ALTER TABLE sessions ADD COLUMN user_recipe_values_json TEXT
780                "#,
781                )
782                .execute(&self.pool)
783                .await?;
784            }
785            3 => {
786                sqlx::query(
787                    r#"
788                    ALTER TABLE messages ADD COLUMN metadata_json TEXT
789                "#,
790                )
791                .execute(&self.pool)
792                .await?;
793            }
794            4 => {
795                sqlx::query(
796                    r#"
797                    ALTER TABLE sessions ADD COLUMN name TEXT DEFAULT ''
798                "#,
799                )
800                .execute(&self.pool)
801                .await?;
802
803                sqlx::query(
804                    r#"
805                    ALTER TABLE sessions ADD COLUMN user_set_name BOOLEAN DEFAULT FALSE
806                "#,
807                )
808                .execute(&self.pool)
809                .await?;
810            }
811            5 => {
812                sqlx::query(
813                    r#"
814                    ALTER TABLE sessions ADD COLUMN session_type TEXT NOT NULL DEFAULT 'user'
815                "#,
816                )
817                .execute(&self.pool)
818                .await?;
819
820                sqlx::query("CREATE INDEX idx_sessions_type ON sessions(session_type)")
821                    .execute(&self.pool)
822                    .await?;
823            }
824            6 => {
825                sqlx::query(
826                    r#"
827                    ALTER TABLE sessions ADD COLUMN provider_name TEXT
828                "#,
829                )
830                .execute(&self.pool)
831                .await?;
832
833                sqlx::query(
834                    r#"
835                    ALTER TABLE sessions ADD COLUMN model_config_json TEXT
836                "#,
837                )
838                .execute(&self.pool)
839                .await?;
840            }
841            _ => {
842                anyhow::bail!("Unknown migration version: {}", version);
843            }
844        }
845
846        Ok(())
847    }
848
849    async fn create_session(
850        &self,
851        working_dir: PathBuf,
852        name: String,
853        session_type: SessionType,
854    ) -> Result<Session> {
855        let mut tx = self.pool.begin().await?;
856
857        let today = chrono::Utc::now().format("%Y%m%d").to_string();
858        let session = sqlx::query_as(
859            r#"
860                INSERT INTO sessions (id, name, user_set_name, session_type, working_dir, extension_data)
861                VALUES (
862                    ? || '_' || CAST(COALESCE((
863                        SELECT MAX(CAST(SUBSTR(id, 10) AS INTEGER))
864                        FROM sessions
865                        WHERE id LIKE ? || '_%'
866                    ), 0) + 1 AS TEXT),
867                    ?,
868                    FALSE,
869                    ?,
870                    ?,
871                    '{}'
872                )
873                RETURNING *
874                "#,
875        )
876            .bind(&today)
877            .bind(&today)
878            .bind(&name)
879            .bind(session_type.to_string())
880            .bind(working_dir.to_string_lossy().as_ref())
881            .fetch_one(&mut *tx)
882            .await?;
883
884        tx.commit().await?;
885        crate::posthog::emit_session_started();
886        Ok(session)
887    }
888
889    async fn get_session(&self, id: &str, include_messages: bool) -> Result<Session> {
890        let mut session = sqlx::query_as::<_, Session>(
891            r#"
892        SELECT id, working_dir, name, description, user_set_name, session_type, created_at, updated_at, extension_data,
893               total_tokens, input_tokens, output_tokens,
894               accumulated_total_tokens, accumulated_input_tokens, accumulated_output_tokens,
895               schedule_id, recipe_json, user_recipe_values_json,
896               provider_name, model_config_json
897        FROM sessions
898        WHERE id = ?
899    "#,
900        )
901            .bind(id)
902            .fetch_optional(&self.pool)
903            .await?
904            .ok_or_else(|| anyhow::anyhow!("Session not found"))?;
905
906        if include_messages {
907            let conv = self.get_conversation(&session.id).await?;
908            session.message_count = conv.messages().len();
909            session.conversation = Some(conv);
910        } else {
911            let count =
912                sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM messages WHERE session_id = ?")
913                    .bind(&session.id)
914                    .fetch_one(&self.pool)
915                    .await? as usize;
916            session.message_count = count;
917        }
918
919        Ok(session)
920    }
921
922    #[allow(clippy::too_many_lines)]
923    async fn apply_update(&self, builder: SessionUpdateBuilder) -> Result<()> {
924        let mut updates = Vec::new();
925        let mut query = String::from("UPDATE sessions SET ");
926
927        macro_rules! add_update {
928            ($field:expr, $name:expr) => {
929                if $field.is_some() {
930                    if !updates.is_empty() {
931                        query.push_str(", ");
932                    }
933                    updates.push($name);
934                    query.push_str($name);
935                    query.push_str(" = ?");
936                }
937            };
938        }
939
940        add_update!(builder.name, "name");
941        add_update!(builder.user_set_name, "user_set_name");
942        add_update!(builder.session_type, "session_type");
943        add_update!(builder.working_dir, "working_dir");
944        add_update!(builder.extension_data, "extension_data");
945        add_update!(builder.total_tokens, "total_tokens");
946        add_update!(builder.input_tokens, "input_tokens");
947        add_update!(builder.output_tokens, "output_tokens");
948        add_update!(builder.accumulated_total_tokens, "accumulated_total_tokens");
949        add_update!(builder.accumulated_input_tokens, "accumulated_input_tokens");
950        add_update!(
951            builder.accumulated_output_tokens,
952            "accumulated_output_tokens"
953        );
954        add_update!(builder.schedule_id, "schedule_id");
955        add_update!(builder.recipe, "recipe_json");
956        add_update!(builder.user_recipe_values, "user_recipe_values_json");
957        add_update!(builder.provider_name, "provider_name");
958        add_update!(builder.model_config, "model_config_json");
959
960        if updates.is_empty() {
961            return Ok(());
962        }
963
964        query.push_str(", ");
965        query.push_str("updated_at = datetime('now') WHERE id = ?");
966
967        let mut q = sqlx::query(&query);
968
969        if let Some(name) = builder.name {
970            q = q.bind(name);
971        }
972        if let Some(user_set_name) = builder.user_set_name {
973            q = q.bind(user_set_name);
974        }
975        if let Some(session_type) = builder.session_type {
976            q = q.bind(session_type.to_string());
977        }
978        if let Some(wd) = builder.working_dir {
979            q = q.bind(wd.to_string_lossy().to_string());
980        }
981        if let Some(ed) = builder.extension_data {
982            q = q.bind(serde_json::to_string(&ed)?);
983        }
984        if let Some(tt) = builder.total_tokens {
985            q = q.bind(tt);
986        }
987        if let Some(it) = builder.input_tokens {
988            q = q.bind(it);
989        }
990        if let Some(ot) = builder.output_tokens {
991            q = q.bind(ot);
992        }
993        if let Some(att) = builder.accumulated_total_tokens {
994            q = q.bind(att);
995        }
996        if let Some(ait) = builder.accumulated_input_tokens {
997            q = q.bind(ait);
998        }
999        if let Some(aot) = builder.accumulated_output_tokens {
1000            q = q.bind(aot);
1001        }
1002        if let Some(sid) = builder.schedule_id {
1003            q = q.bind(sid);
1004        }
1005        if let Some(recipe) = builder.recipe {
1006            let recipe_json = recipe.map(|r| serde_json::to_string(&r)).transpose()?;
1007            q = q.bind(recipe_json);
1008        }
1009        if let Some(user_recipe_values) = builder.user_recipe_values {
1010            let user_recipe_values_json = user_recipe_values
1011                .map(|urv| serde_json::to_string(&urv))
1012                .transpose()?;
1013            q = q.bind(user_recipe_values_json);
1014        }
1015        if let Some(provider_name) = builder.provider_name {
1016            q = q.bind(provider_name);
1017        }
1018        if let Some(model_config) = builder.model_config {
1019            let model_config_json = model_config
1020                .map(|mc| serde_json::to_string(&mc))
1021                .transpose()?;
1022            q = q.bind(model_config_json);
1023        }
1024
1025        let mut tx = self.pool.begin().await?;
1026        q = q.bind(&builder.session_id);
1027        q.execute(&mut *tx).await?;
1028
1029        tx.commit().await?;
1030        Ok(())
1031    }
1032
1033    async fn get_conversation(&self, session_id: &str) -> Result<Conversation> {
1034        let rows = sqlx::query_as::<_, (String, String, i64, Option<String>)>(
1035            "SELECT role, content_json, created_timestamp, metadata_json FROM messages WHERE session_id = ? ORDER BY timestamp",
1036        )
1037            .bind(session_id)
1038            .fetch_all(&self.pool)
1039            .await?;
1040
1041        let mut messages = Vec::new();
1042        for (idx, (role_str, content_json, created_timestamp, metadata_json)) in
1043            rows.into_iter().enumerate()
1044        {
1045            let role = match role_str.as_str() {
1046                "user" => Role::User,
1047                "assistant" => Role::Assistant,
1048                _ => continue,
1049            };
1050
1051            let content = serde_json::from_str(&content_json)?;
1052            let metadata = metadata_json
1053                .and_then(|json| serde_json::from_str(&json).ok())
1054                .unwrap_or_default();
1055
1056            let mut message = Message::new(role, created_timestamp, content);
1057            message.metadata = metadata;
1058            message = message.with_id(format!("msg_{}_{}", session_id, idx));
1059            messages.push(message);
1060        }
1061
1062        Ok(Conversation::new_unvalidated(messages))
1063    }
1064
1065    async fn add_message(&self, session_id: &str, message: &Message) -> Result<()> {
1066        let mut tx = self.pool.begin().await?;
1067
1068        let metadata_json = serde_json::to_string(&message.metadata)?;
1069
1070        sqlx::query(
1071            r#"
1072            INSERT INTO messages (session_id, role, content_json, created_timestamp, metadata_json)
1073            VALUES (?, ?, ?, ?, ?)
1074        "#,
1075        )
1076        .bind(session_id)
1077        .bind(role_to_string(&message.role))
1078        .bind(serde_json::to_string(&message.content)?)
1079        .bind(message.created)
1080        .bind(metadata_json)
1081        .execute(&mut *tx)
1082        .await?;
1083
1084        sqlx::query("UPDATE sessions SET updated_at = datetime('now') WHERE id = ?")
1085            .bind(session_id)
1086            .execute(&mut *tx)
1087            .await?;
1088
1089        tx.commit().await?;
1090        Ok(())
1091    }
1092
1093    async fn replace_conversation(
1094        &self,
1095        session_id: &str,
1096        conversation: &Conversation,
1097    ) -> Result<()> {
1098        let mut tx = self.pool.begin().await?;
1099
1100        sqlx::query("DELETE FROM messages WHERE session_id = ?")
1101            .bind(session_id)
1102            .execute(&mut *tx)
1103            .await?;
1104
1105        for message in conversation.messages() {
1106            let metadata_json = serde_json::to_string(&message.metadata)?;
1107
1108            sqlx::query(
1109                r#"
1110            INSERT INTO messages (session_id, role, content_json, created_timestamp, metadata_json)
1111            VALUES (?, ?, ?, ?, ?)
1112        "#,
1113            )
1114            .bind(session_id)
1115            .bind(role_to_string(&message.role))
1116            .bind(serde_json::to_string(&message.content)?)
1117            .bind(message.created)
1118            .bind(metadata_json)
1119            .execute(&mut *tx)
1120            .await?;
1121        }
1122
1123        tx.commit().await?;
1124        Ok(())
1125    }
1126
1127    async fn list_sessions_by_types(&self, types: &[SessionType]) -> Result<Vec<Session>> {
1128        if types.is_empty() {
1129            return Ok(Vec::new());
1130        }
1131
1132        let placeholders: String = types.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
1133        let query = format!(
1134            r#"
1135            SELECT s.id, s.working_dir, s.name, s.description, s.user_set_name, s.session_type, s.created_at, s.updated_at, s.extension_data,
1136                   s.total_tokens, s.input_tokens, s.output_tokens,
1137                   s.accumulated_total_tokens, s.accumulated_input_tokens, s.accumulated_output_tokens,
1138                   s.schedule_id, s.recipe_json, s.user_recipe_values_json,
1139                   s.provider_name, s.model_config_json,
1140                   COUNT(m.id) as message_count
1141            FROM sessions s
1142            INNER JOIN messages m ON s.id = m.session_id
1143            WHERE s.session_type IN ({})
1144            GROUP BY s.id
1145            ORDER BY s.updated_at DESC
1146            "#,
1147            placeholders
1148        );
1149
1150        let mut q = sqlx::query_as::<_, Session>(&query);
1151        for t in types {
1152            q = q.bind(t.to_string());
1153        }
1154
1155        q.fetch_all(&self.pool).await.map_err(Into::into)
1156    }
1157
1158    async fn list_sessions(&self) -> Result<Vec<Session>> {
1159        self.list_sessions_by_types(&[SessionType::User, SessionType::Scheduled])
1160            .await
1161    }
1162
1163    async fn delete_session(&self, session_id: &str) -> Result<()> {
1164        let mut tx = self.pool.begin().await?;
1165
1166        let exists =
1167            sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM sessions WHERE id = ?)")
1168                .bind(session_id)
1169                .fetch_one(&mut *tx)
1170                .await?;
1171
1172        if !exists {
1173            return Err(anyhow::anyhow!("Session not found"));
1174        }
1175
1176        sqlx::query("DELETE FROM messages WHERE session_id = ?")
1177            .bind(session_id)
1178            .execute(&mut *tx)
1179            .await?;
1180
1181        sqlx::query("DELETE FROM sessions WHERE id = ?")
1182            .bind(session_id)
1183            .execute(&mut *tx)
1184            .await?;
1185
1186        tx.commit().await?;
1187        Ok(())
1188    }
1189
1190    async fn get_insights(&self) -> Result<SessionInsights> {
1191        let row = sqlx::query_as::<_, (i64, Option<i64>)>(
1192            r#"
1193            SELECT COUNT(*) as total_sessions,
1194                   COALESCE(SUM(COALESCE(accumulated_total_tokens, total_tokens, 0)), 0) as total_tokens
1195            FROM sessions
1196            "#,
1197        )
1198            .fetch_one(&self.pool)
1199            .await?;
1200
1201        Ok(SessionInsights {
1202            total_sessions: row.0 as usize,
1203            total_tokens: row.1.unwrap_or(0),
1204        })
1205    }
1206
1207    async fn export_session(&self, id: &str) -> Result<String> {
1208        let session = self.get_session(id, true).await?;
1209        serde_json::to_string_pretty(&session).map_err(Into::into)
1210    }
1211
1212    async fn import_session(&self, json: &str) -> Result<Session> {
1213        let import: Session = serde_json::from_str(json)?;
1214
1215        let session = self
1216            .create_session(
1217                import.working_dir.clone(),
1218                import.name.clone(),
1219                import.session_type,
1220            )
1221            .await?;
1222
1223        let mut builder = SessionUpdateBuilder::new(session.id.clone())
1224            .extension_data(import.extension_data)
1225            .total_tokens(import.total_tokens)
1226            .input_tokens(import.input_tokens)
1227            .output_tokens(import.output_tokens)
1228            .accumulated_total_tokens(import.accumulated_total_tokens)
1229            .accumulated_input_tokens(import.accumulated_input_tokens)
1230            .accumulated_output_tokens(import.accumulated_output_tokens)
1231            .schedule_id(import.schedule_id)
1232            .recipe(import.recipe)
1233            .user_recipe_values(import.user_recipe_values);
1234
1235        if import.user_set_name {
1236            builder = builder.user_provided_name(import.name.clone());
1237        }
1238
1239        self.apply_update(builder).await?;
1240
1241        if let Some(conversation) = import.conversation {
1242            self.replace_conversation(&session.id, &conversation)
1243                .await?;
1244        }
1245
1246        self.get_session(&session.id, true).await
1247    }
1248
1249    async fn copy_session(&self, session_id: &str, new_name: String) -> Result<Session> {
1250        let original_session = self.get_session(session_id, true).await?;
1251
1252        let new_session = self
1253            .create_session(
1254                original_session.working_dir.clone(),
1255                new_name,
1256                original_session.session_type,
1257            )
1258            .await?;
1259
1260        let builder = SessionUpdateBuilder::new(new_session.id.clone())
1261            .extension_data(original_session.extension_data)
1262            .schedule_id(original_session.schedule_id)
1263            .recipe(original_session.recipe)
1264            .user_recipe_values(original_session.user_recipe_values);
1265
1266        self.apply_update(builder).await?;
1267
1268        if let Some(conversation) = original_session.conversation {
1269            self.replace_conversation(&new_session.id, &conversation)
1270                .await?;
1271        }
1272
1273        self.get_session(&new_session.id, true).await
1274    }
1275
1276    async fn truncate_conversation(&self, session_id: &str, timestamp: i64) -> Result<()> {
1277        sqlx::query("DELETE FROM messages WHERE session_id = ? AND created_timestamp >= ?")
1278            .bind(session_id)
1279            .bind(timestamp)
1280            .execute(&self.pool)
1281            .await?;
1282
1283        Ok(())
1284    }
1285
1286    async fn search_chat_history(
1287        &self,
1288        query: &str,
1289        limit: Option<usize>,
1290        after_date: Option<chrono::DateTime<chrono::Utc>>,
1291        before_date: Option<chrono::DateTime<chrono::Utc>>,
1292        exclude_session_id: Option<String>,
1293    ) -> Result<crate::session::chat_history_search::ChatRecallResults> {
1294        use crate::session::chat_history_search::ChatHistorySearch;
1295
1296        ChatHistorySearch::new(
1297            &self.pool,
1298            query,
1299            limit,
1300            after_date,
1301            before_date,
1302            exclude_session_id,
1303        )
1304        .execute()
1305        .await
1306    }
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311    use super::*;
1312    use crate::conversation::message::{Message, MessageContent};
1313    use tempfile::TempDir;
1314
1315    const NUM_SESSIONS: i32 = 10;
1316
1317    #[tokio::test]
1318    async fn test_concurrent_session_creation() {
1319        let temp_dir = TempDir::new().unwrap();
1320        let db_path = temp_dir.path().join("test_sessions.db");
1321
1322        let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1323
1324        // 串行创建 session,避免 SQLite 并发锁问题
1325        let mut results = vec![];
1326        for i in 0..NUM_SESSIONS {
1327            let working_dir = PathBuf::from(format!("/tmp/test_{}", i));
1328            let description = format!("Test session {}", i);
1329
1330            let session = storage
1331                .create_session(working_dir.clone(), description, SessionType::User)
1332                .await
1333                .unwrap();
1334
1335            storage
1336                .add_message(
1337                    &session.id,
1338                    &Message {
1339                        id: None,
1340                        role: Role::User,
1341                        created: chrono::Utc::now().timestamp_millis(),
1342                        content: vec![MessageContent::text("hello world")],
1343                        metadata: Default::default(),
1344                    },
1345                )
1346                .await
1347                .unwrap();
1348
1349            storage
1350                .add_message(
1351                    &session.id,
1352                    &Message {
1353                        id: None,
1354                        role: Role::Assistant,
1355                        created: chrono::Utc::now().timestamp_millis(),
1356                        content: vec![MessageContent::text("sup world?")],
1357                        metadata: Default::default(),
1358                    },
1359                )
1360                .await
1361                .unwrap();
1362
1363            storage
1364                .apply_update(
1365                    SessionUpdateBuilder::new(session.id.clone())
1366                        .user_provided_name(format!("Updated session {}", i))
1367                        .total_tokens(Some(100 * i)),
1368                )
1369                .await
1370                .unwrap();
1371
1372            let updated = storage.get_session(&session.id, true).await.unwrap();
1373            assert_eq!(updated.message_count, 2);
1374            assert_eq!(updated.total_tokens, Some(100 * i));
1375
1376            results.push(session.id);
1377        }
1378
1379        assert_eq!(results.len(), NUM_SESSIONS as usize);
1380
1381        let unique_ids: std::collections::HashSet<_> = results.iter().collect();
1382        assert_eq!(unique_ids.len(), NUM_SESSIONS as usize);
1383
1384        let sessions = storage.list_sessions().await.unwrap();
1385        assert_eq!(sessions.len(), NUM_SESSIONS as usize);
1386
1387        for session in &sessions {
1388            assert_eq!(session.message_count, 2);
1389            assert!(session.name.starts_with("Updated session"));
1390        }
1391
1392        let insights = storage.get_insights().await.unwrap();
1393        assert_eq!(insights.total_sessions, NUM_SESSIONS as usize);
1394        let expected_tokens = 100 * NUM_SESSIONS * (NUM_SESSIONS - 1) / 2;
1395        assert_eq!(insights.total_tokens, expected_tokens as i64);
1396    }
1397
1398    #[tokio::test]
1399    async fn test_export_import_roundtrip() {
1400        const DESCRIPTION: &str = "Original session";
1401        const TOTAL_TOKENS: i32 = 500;
1402        const INPUT_TOKENS: i32 = 300;
1403        const OUTPUT_TOKENS: i32 = 200;
1404        const ACCUMULATED_TOKENS: i32 = 1000;
1405        const USER_MESSAGE: &str = "test message";
1406        const ASSISTANT_MESSAGE: &str = "test response";
1407
1408        let temp_dir = TempDir::new().unwrap();
1409        let db_path = temp_dir.path().join("test_export.db");
1410        let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1411
1412        let original = storage
1413            .create_session(
1414                PathBuf::from("/tmp/test"),
1415                DESCRIPTION.to_string(),
1416                SessionType::User,
1417            )
1418            .await
1419            .unwrap();
1420
1421        storage
1422            .apply_update(
1423                SessionUpdateBuilder::new(original.id.clone())
1424                    .total_tokens(Some(TOTAL_TOKENS))
1425                    .input_tokens(Some(INPUT_TOKENS))
1426                    .output_tokens(Some(OUTPUT_TOKENS))
1427                    .accumulated_total_tokens(Some(ACCUMULATED_TOKENS)),
1428            )
1429            .await
1430            .unwrap();
1431
1432        storage
1433            .add_message(
1434                &original.id,
1435                &Message {
1436                    id: None,
1437                    role: Role::User,
1438                    created: chrono::Utc::now().timestamp_millis(),
1439                    content: vec![MessageContent::text(USER_MESSAGE)],
1440                    metadata: Default::default(),
1441                },
1442            )
1443            .await
1444            .unwrap();
1445
1446        storage
1447            .add_message(
1448                &original.id,
1449                &Message {
1450                    id: None,
1451                    role: Role::Assistant,
1452                    created: chrono::Utc::now().timestamp_millis(),
1453                    content: vec![MessageContent::text(ASSISTANT_MESSAGE)],
1454                    metadata: Default::default(),
1455                },
1456            )
1457            .await
1458            .unwrap();
1459
1460        let exported = storage.export_session(&original.id).await.unwrap();
1461        let imported = storage.import_session(&exported).await.unwrap();
1462
1463        assert_ne!(imported.id, original.id);
1464        assert_eq!(imported.name, DESCRIPTION);
1465        assert_eq!(imported.working_dir, PathBuf::from("/tmp/test"));
1466        assert_eq!(imported.total_tokens, Some(TOTAL_TOKENS));
1467        assert_eq!(imported.input_tokens, Some(INPUT_TOKENS));
1468        assert_eq!(imported.output_tokens, Some(OUTPUT_TOKENS));
1469        assert_eq!(imported.accumulated_total_tokens, Some(ACCUMULATED_TOKENS));
1470        assert_eq!(imported.message_count, 2);
1471
1472        let conversation = imported.conversation.unwrap();
1473        assert_eq!(conversation.messages().len(), 2);
1474        assert_eq!(conversation.messages()[0].role, Role::User);
1475        assert_eq!(conversation.messages()[1].role, Role::Assistant);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_import_session_with_description_field() {
1480        const OLD_FORMAT_JSON: &str = r#"{
1481            "id": "20240101_1",
1482            "description": "Old format session",
1483            "user_set_name": true,
1484            "working_dir": "/tmp/test",
1485            "created_at": "2024-01-01T00:00:00Z",
1486            "updated_at": "2024-01-01T00:00:00Z",
1487            "extension_data": {},
1488            "message_count": 0
1489        }"#;
1490
1491        let temp_dir = TempDir::new().unwrap();
1492        let db_path = temp_dir.path().join("test_import.db");
1493        let storage = Arc::new(SessionStorage::create(&db_path).await.unwrap());
1494
1495        let imported = storage.import_session(OLD_FORMAT_JSON).await.unwrap();
1496
1497        assert_eq!(imported.name, "Old format session");
1498        assert!(imported.user_set_name);
1499        assert_eq!(imported.working_dir, PathBuf::from("/tmp/test"));
1500    }
1501}