Skip to main content

bamboo_infrastructure/storage/
v2.rs

1//! Session storage V2 (folder-per-session + global index).
2//!
3//! Storage layout under `bamboo_home_dir`:
4//! - `sessions.json` (global index, O(1) session_id -> rel_path)
5//! - `sessions/<root_id>/session.json`
6//! - `sessions/<root_id>/children/<child_id>/session.json`
7//! - `.../attachments/` (files; session.json stores references, never base64)
8//!
9//! Notes:
10//! - This is a greenfield format (no migration). Old on-disk layouts are ignored.
11//! - Directory scanning is only used for dev-only index rebuild/recovery (not in hot paths).
12
13use std::collections::{HashMap, HashSet};
14use std::io;
15use std::path::{Path, PathBuf};
16
17use base64::Engine;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use tokio::fs;
21use tokio::sync::{Mutex, RwLock};
22use uuid::Uuid;
23
24use bamboo_domain::ProviderModelRef;
25use bamboo_domain::ReasoningEffort;
26use bamboo_domain::{Role, Session, SessionKind, TokenBudgetUsage};
27
28use crate::storage::search_index::{should_index_session, SessionSearchIndex};
29use bamboo_domain::AttachmentReader;
30use bamboo_domain::Storage;
31
32fn other_io_error(message: impl Into<String>) -> io::Error {
33    io::Error::new(io::ErrorKind::Other, message.into())
34}
35
36fn validate_session_id(session_id: &str) -> io::Result<()> {
37    if session_id.is_empty()
38        || session_id.contains('/')
39        || session_id.contains('\\')
40        || session_id.contains("..")
41    {
42        return Err(other_io_error(format!("invalid session id: {session_id}")));
43    }
44    Ok(())
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct SessionIndexEntry {
49    pub id: String,
50    pub kind: SessionKind,
51    /// Path relative to `bamboo_home_dir` (e.g. "sessions/<id>" or "sessions/<root>/children/<id>").
52    pub rel_path: String,
53    pub title: String,
54    pub pinned: bool,
55    pub parent_session_id: Option<String>,
56    pub root_session_id: String,
57    pub spawn_depth: u32,
58    #[serde(default)]
59    pub model: String,
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub model_ref: Option<ProviderModelRef>,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub reasoning_effort: Option<ReasoningEffort>,
64    /// If the session was created by a schedule, store the schedule id here for fast filtering.
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub created_by_schedule_id: Option<String>,
67    /// If the session was created by a specific schedule run, keep the run id here.
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub schedule_run_id: Option<String>,
70    pub created_at: DateTime<Utc>,
71    pub updated_at: DateTime<Utc>,
72    pub last_activity_at: DateTime<Utc>,
73    pub message_count: usize,
74    pub has_attachments: bool,
75    /// Last known run status for this session
76    /// ("pending" | "running" | "completed" | "error" | "cancelled" | "skipped").
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub last_run_status: Option<String>,
79    /// Last known terminal error message, if any.
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub last_run_error: Option<String>,
82    /// Last token usage information (updated after each LLM call).
83    ///
84    /// Stored in the global index so the frontend can display token usage without
85    /// loading full session.json for every row.
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub token_usage: Option<TokenBudgetUsage>,
88    /// SubAgent profile id for child sessions spawned by `SubSession.create`.
89    /// Mirrored into the index from `session.metadata["subagent_type"]` so the
90    /// frontend can render role badges (e.g. "general-purpose", "plan") on the
91    /// child-session list without loading each session.json.
92    /// `None` for root sessions and for legacy children created before this
93    /// field was introduced.
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub subagent_type: Option<String>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct SessionsIndex {
100    pub version: u32,
101    pub updated_at: DateTime<Utc>,
102    pub sessions: HashMap<String, SessionIndexEntry>,
103}
104
105impl SessionsIndex {
106    fn empty() -> Self {
107        Self {
108            version: 2,
109            updated_at: Utc::now(),
110            sessions: HashMap::new(),
111        }
112    }
113}
114
115#[derive(Debug)]
116pub struct SessionStoreV2 {
117    bamboo_home_dir: PathBuf,
118    sessions_dir: PathBuf,
119    index_path: PathBuf,
120    search_index: SessionSearchIndex,
121    index: RwLock<SessionsIndex>,
122    /// Serializes on-disk index writes (and any multi-step operations that must be atomic-ish).
123    write_lock: Mutex<()>,
124}
125
126impl SessionStoreV2 {
127    pub async fn new(bamboo_home_dir: PathBuf) -> io::Result<Self> {
128        let sessions_dir = bamboo_home_dir.join("sessions");
129        let index_path = bamboo_home_dir.join("sessions.json");
130        let search_index = SessionSearchIndex::new(bamboo_home_dir.join("session_search.db"));
131
132        fs::create_dir_all(&sessions_dir).await?;
133        search_index.init().await?;
134
135        let index = if index_path.exists() {
136            let raw = fs::read_to_string(&index_path).await?;
137            serde_json::from_str(&raw)
138                .map_err(|e| other_io_error(format!("invalid sessions.json: {e}")))?
139        } else {
140            let index = SessionsIndex::empty();
141            // Persist immediately so "index is mandatory" holds from boot.
142            let tmp = index_path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
143            fs::write(
144                &tmp,
145                serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
146            )
147            .await?;
148            atomic_rename(&tmp, &index_path).await?;
149            index
150        };
151
152        let storage = Self {
153            bamboo_home_dir,
154            sessions_dir,
155            index_path,
156            search_index,
157            index: RwLock::new(index),
158            write_lock: Mutex::new(()),
159        };
160
161        Ok(storage)
162    }
163
164    pub fn search_index(&self) -> &SessionSearchIndex {
165        &self.search_index
166    }
167
168    pub fn bamboo_home_dir(&self) -> &Path {
169        &self.bamboo_home_dir
170    }
171
172    pub fn index_path(&self) -> &Path {
173        &self.index_path
174    }
175
176    pub async fn rebuild_search_index(&self) -> io::Result<()> {
177        let session_ids = {
178            let index = self.index.read().await;
179            index.sessions.keys().cloned().collect::<Vec<_>>()
180        };
181        for session_id in session_ids {
182            if let Some(session) = self.load_session(&session_id).await? {
183                if !should_index_session(session.updated_at) {
184                    continue;
185                }
186                if let Err(error) = self.search_index.upsert_session(&session).await {
187                    tracing::warn!(
188                        "failed to rebuild search index entry for {}: {}",
189                        session_id,
190                        error
191                    );
192                }
193            }
194        }
195        Ok(())
196    }
197
198    pub fn sessions_root_dir(&self) -> &Path {
199        &self.sessions_dir
200    }
201
202    fn root_rel_path(session_id: &str) -> String {
203        format!("sessions/{session_id}")
204    }
205
206    fn child_rel_path(root_id: &str, child_id: &str) -> String {
207        format!("sessions/{root_id}/children/{child_id}")
208    }
209
210    fn abs_path_from_rel(&self, rel: &str) -> PathBuf {
211        self.bamboo_home_dir.join(rel)
212    }
213
214    async fn persist_index_locked(&self, index: &SessionsIndex) -> io::Result<()> {
215        let tmp = self
216            .index_path
217            .with_extension(format!("json.tmp.{}", Uuid::new_v4()));
218        let bytes = serde_json::to_vec_pretty(index).map_err(|e| other_io_error(e.to_string()))?;
219        fs::write(&tmp, bytes).await?;
220        atomic_rename(&tmp, &self.index_path).await?;
221        Ok(())
222    }
223
224    async fn update_index<F, T>(&self, f: F) -> io::Result<T>
225    where
226        F: FnOnce(&mut SessionsIndex) -> io::Result<T>,
227    {
228        let _guard = self.write_lock.lock().await;
229        let mut index = self.index.write().await;
230        let out = f(&mut index)?;
231        index.updated_at = Utc::now();
232        self.persist_index_locked(&index).await?;
233        Ok(out)
234    }
235
236    pub async fn list_index_entries(&self) -> Vec<SessionIndexEntry> {
237        let index = self.index.read().await;
238        let mut items: Vec<_> = index.sessions.values().cloned().collect();
239        items.sort_by_key(|b| std::cmp::Reverse(b.updated_at));
240        items
241    }
242
243    pub async fn get_index_entry(&self, session_id: &str) -> Option<SessionIndexEntry> {
244        let index = self.index.read().await;
245        index.sessions.get(session_id).cloned()
246    }
247
248    pub async fn resolve_rel_path(&self, session_id: &str) -> Option<String> {
249        self.get_index_entry(session_id).await.map(|e| e.rel_path)
250    }
251
252    async fn ensure_session_dirs(&self, session: &Session) -> io::Result<String> {
253        validate_session_id(&session.id)?;
254
255        let rel_path = match session.kind {
256            SessionKind::Root => Self::root_rel_path(&session.id),
257            SessionKind::Child => {
258                let root_id = session.root_session_id.trim();
259                let parent_id = session.parent_session_id.as_deref().unwrap_or("").trim();
260                if root_id.is_empty() || parent_id.is_empty() {
261                    return Err(other_io_error(
262                        "child session missing root_session_id/parent_session_id",
263                    ));
264                }
265                if parent_id != root_id {
266                    return Err(other_io_error(
267                        "child session parent_session_id must equal root_session_id (no nesting)",
268                    ));
269                }
270                validate_session_id(root_id)?;
271                Self::child_rel_path(root_id, &session.id)
272            }
273        };
274
275        let abs_dir = self.abs_path_from_rel(&rel_path);
276        fs::create_dir_all(&abs_dir).await?;
277        // Ensure expected subdirs (lazy; cheap).
278        fs::create_dir_all(abs_dir.join("attachments")).await?;
279        if session.kind == SessionKind::Root {
280            fs::create_dir_all(abs_dir.join("children")).await?;
281        }
282        Ok(rel_path)
283    }
284
285    async fn session_json_path(&self, session_id: &str) -> io::Result<Option<PathBuf>> {
286        if let Some(rel) = self.resolve_rel_path(session_id).await {
287            Ok(Some(self.abs_path_from_rel(&rel).join("session.json")))
288        } else {
289            Ok(None)
290        }
291    }
292
293    async fn attachments_dir(&self, session_id: &str) -> io::Result<Option<PathBuf>> {
294        if let Some(rel) = self.resolve_rel_path(session_id).await {
295            Ok(Some(self.abs_path_from_rel(&rel).join("attachments")))
296        } else {
297            Ok(None)
298        }
299    }
300
301    async fn compute_has_attachments(&self, session_id: &str) -> bool {
302        let Ok(Some(dir)) = self.attachments_dir(session_id).await else {
303            return false;
304        };
305        let Ok(mut rd) = fs::read_dir(dir).await else {
306            return false;
307        };
308        rd.next_entry().await.ok().flatten().is_some()
309    }
310
311    async fn upsert_index_from_session(
312        &self,
313        session: &Session,
314        rel_path: String,
315    ) -> io::Result<()> {
316        let has_attachments = self.compute_has_attachments(&session.id).await;
317        let last_run_status = session
318            .metadata
319            .get("last_run_status")
320            .cloned()
321            .filter(|value| !value.trim().is_empty());
322        let last_run_error = session
323            .metadata
324            .get("last_run_error")
325            .cloned()
326            .filter(|value| !value.trim().is_empty());
327        let created_by_schedule_id = session
328            .metadata
329            .get("created_by_schedule_id")
330            .cloned()
331            .filter(|v| !v.trim().is_empty());
332        let schedule_run_id = session
333            .metadata
334            .get("schedule_run_id")
335            .cloned()
336            .filter(|v| !v.trim().is_empty());
337        let subagent_type = session
338            .metadata
339            .get("subagent_type")
340            .cloned()
341            .filter(|v| !v.trim().is_empty());
342        self.update_index(|index| {
343            index.sessions.insert(
344                session.id.clone(),
345                SessionIndexEntry {
346                    id: session.id.clone(),
347                    kind: session.kind,
348                    rel_path,
349                    title: session.title.clone(),
350                    pinned: session.pinned,
351                    parent_session_id: session.parent_session_id.clone(),
352                    root_session_id: session.root_session_id.clone(),
353                    spawn_depth: session.spawn_depth,
354                    model: session.model.clone(),
355                    model_ref: session.model_ref.clone(),
356                    reasoning_effort: session.reasoning_effort,
357                    created_by_schedule_id,
358                    schedule_run_id,
359                    created_at: session.created_at,
360                    updated_at: session.updated_at,
361                    last_activity_at: session.updated_at,
362                    message_count: session.messages.len(),
363                    has_attachments,
364                    last_run_status,
365                    last_run_error,
366                    token_usage: session.token_usage.clone(),
367                    subagent_type,
368                },
369            );
370            Ok(())
371        })
372        .await?;
373        Ok(())
374    }
375
376    pub async fn write_image_attachment(
377        &self,
378        session: &Session,
379        raw_base64_or_data_url: &str,
380        mime_hint: Option<&str>,
381    ) -> io::Result<(String, String)> {
382        let (mime, base64_data) =
383            parse_data_url_base64(raw_base64_or_data_url).unwrap_or_else(|| {
384                (
385                    mime_hint.unwrap_or("image/png").trim().to_string(),
386                    raw_base64_or_data_url.trim().to_string(),
387                )
388            });
389
390        let bytes = base64::engine::general_purpose::STANDARD
391            .decode(base64_data.as_bytes())
392            .map_err(|e| other_io_error(format!("invalid base64 image data: {e}")))?;
393
394        let attachment_id = Uuid::new_v4().to_string();
395        let ext = mime_to_extension(mime.as_str()).unwrap_or("bin");
396
397        let rel_path = self.ensure_session_dirs(session).await?;
398        let abs_dir = self.abs_path_from_rel(&rel_path);
399        let attachments_dir = abs_dir.join("attachments");
400        fs::create_dir_all(&attachments_dir).await?;
401
402        let path = attachments_dir.join(format!("{attachment_id}.{ext}"));
403        let tmp = path.with_extension(format!("{ext}.tmp.{}", Uuid::new_v4()));
404        fs::write(&tmp, &bytes).await?;
405        atomic_rename(&tmp, &path).await?;
406
407        Ok((
408            attachment_id.clone(),
409            format!("bamboo-attachment://{}/{}", session.id, attachment_id),
410        ))
411    }
412
413    /// Read an attachment by id, returning bytes + inferred MIME.
414    pub async fn read_attachment(
415        &self,
416        session_id: &str,
417        attachment_id: &str,
418    ) -> io::Result<Option<(Vec<u8>, String)>> {
419        validate_session_id(session_id)?;
420        validate_session_id(attachment_id)?;
421        let Some(dir) = self.attachments_dir(session_id).await? else {
422            return Ok(None);
423        };
424        if !dir.exists() {
425            return Ok(None);
426        }
427
428        let mut rd = fs::read_dir(&dir).await?;
429        while let Some(entry) = rd.next_entry().await? {
430            let file_name = entry.file_name();
431            let file_name = file_name.to_string_lossy();
432            if !file_name.starts_with(attachment_id) {
433                continue;
434            }
435            // Match "<id>.<ext>"
436            if file_name.len() <= attachment_id.len() + 1
437                || !file_name.as_bytes()[attachment_id.len()].eq(&b'.')
438            {
439                continue;
440            }
441            let ext = file_name.split('.').next_back().unwrap_or("bin");
442            let mime = extension_to_mime(ext).unwrap_or("application/octet-stream");
443            let bytes = fs::read(entry.path()).await?;
444            return Ok(Some((bytes, mime.to_string())));
445        }
446
447        Ok(None)
448    }
449
450    pub async fn clear_session(&self, session_id: &str) -> io::Result<bool> {
451        let Some(mut session) = self.load_session(session_id).await? else {
452            return Ok(false);
453        };
454
455        // Keep only the first System message if present; drop all other messages.
456        let system_msg = session
457            .messages
458            .iter()
459            .find(|m| matches!(m.role, Role::System))
460            .cloned();
461        session.messages.clear();
462        if let Some(system) = system_msg {
463            session.messages.push(system);
464        }
465
466        // Clearing history invalidates derived context state.
467        session.token_usage = None;
468        session.conversation_summary = None;
469        session.updated_at = Utc::now();
470
471        // Remove attachments on disk.
472        if let Ok(Some(dir)) = self.attachments_dir(session_id).await {
473            let _ = fs::remove_dir_all(&dir).await;
474            let _ = fs::create_dir_all(&dir).await;
475        }
476
477        self.save_session(&session).await?;
478        Ok(true)
479    }
480
481    pub async fn cleanup(&self, mode: CleanupMode, keep_pinned: bool) -> io::Result<CleanupResult> {
482        // All decisions are index-only.
483        let entries = {
484            self.index
485                .read()
486                .await
487                .sessions
488                .values()
489                .cloned()
490                .collect::<Vec<_>>()
491        };
492
493        let pinned_child_roots: HashSet<String> = if keep_pinned {
494            entries
495                .iter()
496                .filter(|e| e.kind == SessionKind::Child && e.pinned)
497                .filter_map(|e| e.parent_session_id.clone())
498                .collect()
499        } else {
500            HashSet::new()
501        };
502
503        // Helper to decide whether an entry is protected.
504        let is_protected = |e: &SessionIndexEntry| -> bool {
505            if !keep_pinned {
506                return false;
507            }
508            if e.pinned {
509                return true;
510            }
511            // A root with pinned child cannot be deleted.
512            if e.kind == SessionKind::Root && pinned_child_roots.contains(&e.id) {
513                return true;
514            }
515            false
516        };
517
518        // Determine deletions as a set of session ids (roots and/or children).
519        let mut delete_child_ids = HashSet::<String>::new();
520        let mut delete_root_ids = HashSet::<String>::new();
521
522        match mode {
523            CleanupMode::Children => {
524                for e in entries.iter().filter(|e| e.kind == SessionKind::Child) {
525                    if is_protected(e) {
526                        continue;
527                    }
528                    delete_child_ids.insert(e.id.clone());
529                }
530            }
531            CleanupMode::All | CleanupMode::Empty => {
532                // First decide which roots can be deleted.
533                for root in entries.iter().filter(|e| e.kind == SessionKind::Root) {
534                    if is_protected(root) {
535                        continue;
536                    }
537                    if mode == CleanupMode::Empty && root.message_count > 1 {
538                        continue;
539                    }
540                    delete_root_ids.insert(root.id.clone());
541                }
542
543                // For roots we keep, we may still delete some children (e.g., unpinned, or empty).
544                for child in entries.iter().filter(|e| e.kind == SessionKind::Child) {
545                    if delete_root_ids.contains(&child.root_session_id) {
546                        continue; // will be deleted with root.
547                    }
548                    if is_protected(child) {
549                        continue;
550                    }
551                    if mode == CleanupMode::Empty && child.message_count > 1 {
552                        continue;
553                    }
554                    delete_child_ids.insert(child.id.clone());
555                }
556            }
557        }
558
559        // Pre-compute full deleted id set for a truthful response payload.
560        let mut deleted_ids = HashSet::<String>::new();
561        for root_id in delete_root_ids.iter() {
562            for e in entries.iter().filter(|e| e.root_session_id == *root_id) {
563                deleted_ids.insert(e.id.clone());
564            }
565        }
566        for child_id in delete_child_ids.iter() {
567            deleted_ids.insert(child_id.clone());
568        }
569
570        // Apply deletions (roots first; they delete children implicitly).
571        for root_id in delete_root_ids.iter() {
572            let _ = self.delete_session_recursive(root_id, true).await?;
573        }
574        for child_id in delete_child_ids.iter() {
575            let _ = self.delete_session_recursive(child_id, true).await?;
576        }
577        let mut deleted_session_ids: Vec<String> = deleted_ids.into_iter().collect();
578        deleted_session_ids.sort();
579        Ok(CleanupResult {
580            deleted_count: deleted_session_ids.len(),
581            deleted_session_ids,
582        })
583    }
584
585    /// Development-only: hard reset all sessions and the index.
586    ///
587    /// This is the supported "greenfield" mechanism. It deletes:
588    /// - `bamboo_home_dir/sessions/`
589    /// - `bamboo_home_dir/sessions.json` (rewritten to empty index)
590    pub async fn dev_reset(&self) -> io::Result<()> {
591        let _guard = self.write_lock.lock().await;
592
593        // Remove the sessions directory entirely.
594        let _ = fs::remove_dir_all(&self.sessions_dir).await;
595        fs::create_dir_all(&self.sessions_dir).await?;
596
597        // Reset in-memory index and persist.
598        {
599            let mut index = self.index.write().await;
600            *index = SessionsIndex::empty();
601            self.persist_index_locked(&index).await?;
602        }
603
604        Ok(())
605    }
606
607    /// Delete a session. If the session is a root, deletes its entire directory (and all child sessions).
608    /// If the session is a child, deletes only that child directory.
609    ///
610    /// `force=true` ignores pinned protection; callers must enforce confirmations at the API/UI layer.
611    pub async fn delete_session_recursive(
612        &self,
613        session_id: &str,
614        force: bool,
615    ) -> io::Result<bool> {
616        let entry = self.get_index_entry(session_id).await;
617        let Some(entry) = entry else {
618            return Ok(false);
619        };
620
621        if !force && entry.pinned {
622            return Err(other_io_error(
623                "refusing to delete pinned session without force",
624            ));
625        }
626
627        match entry.kind {
628            SessionKind::Child => {
629                let abs_dir = self.abs_path_from_rel(&entry.rel_path);
630                let _ = fs::remove_dir_all(&abs_dir).await;
631                self.update_index(|index| {
632                    index.sessions.remove(session_id);
633                    Ok(())
634                })
635                .await?;
636                if let Err(error) = self.search_index.delete_session(session_id).await {
637                    tracing::warn!(
638                        "failed to delete session search index row for {}: {}",
639                        session_id,
640                        error
641                    );
642                }
643                Ok(true)
644            }
645            SessionKind::Root => {
646                let root_id = entry.id.clone();
647                let abs_dir = self.abs_path_from_rel(&entry.rel_path);
648                let _ = fs::remove_dir_all(&abs_dir).await;
649
650                let to_remove_ids = {
651                    let index = self.index.read().await;
652                    index
653                        .sessions
654                        .values()
655                        .filter(|e| e.root_session_id == root_id)
656                        .map(|e| e.id.clone())
657                        .collect::<Vec<_>>()
658                };
659
660                self.update_index(|index| {
661                    for id in &to_remove_ids {
662                        index.sessions.remove(id);
663                    }
664                    Ok(())
665                })
666                .await?;
667
668                for id in to_remove_ids {
669                    if let Err(error) = self.search_index.delete_session(&id).await {
670                        tracing::warn!(
671                            "failed to delete session search index row for {}: {}",
672                            id,
673                            error
674                        );
675                    }
676                }
677                Ok(true)
678            }
679        }
680    }
681}
682
683#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub enum CleanupMode {
685    All,
686    Empty,
687    Children,
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct CleanupResult {
692    pub deleted_count: usize,
693    pub deleted_session_ids: Vec<String>,
694}
695
696async fn atomic_rename(from: &Path, to: &Path) -> io::Result<()> {
697    // Best-effort atomic on Unix. On Windows, rename cannot overwrite.
698    match fs::rename(from, to).await {
699        Ok(()) => Ok(()),
700        Err(err) => {
701            if to.exists() {
702                let _ = fs::remove_file(to).await;
703            }
704            fs::rename(from, to).await.map_err(|e| {
705                other_io_error(format!(
706                    "failed to rename {:?} -> {:?}: {} (original: {})",
707                    from, to, e, err
708                ))
709            })
710        }
711    }
712}
713
714fn parse_data_url_base64(url: &str) -> Option<(String, String)> {
715    // data:<mime>;base64,<data...>
716    let trimmed = url.trim();
717    if !trimmed.starts_with("data:") {
718        return None;
719    }
720    let trimmed = trimmed.strip_prefix("data:")?;
721    let (header, data) = trimmed.split_once(',')?;
722    if !header.contains(";base64") {
723        return None;
724    }
725    let mime = header.split(';').next()?.trim().to_string();
726    Some((mime, data.trim().to_string()))
727}
728
729fn mime_to_extension(mime: &str) -> Option<&'static str> {
730    match mime.trim().to_ascii_lowercase().as_str() {
731        "image/png" => Some("png"),
732        "image/jpeg" => Some("jpg"),
733        "image/webp" => Some("webp"),
734        "image/gif" => Some("gif"),
735        "image/bmp" => Some("bmp"),
736        _ => None,
737    }
738}
739
740fn extension_to_mime(ext: &str) -> Option<&'static str> {
741    match ext.trim().to_ascii_lowercase().as_str() {
742        "png" => Some("image/png"),
743        "jpg" | "jpeg" => Some("image/jpeg"),
744        "webp" => Some("image/webp"),
745        "gif" => Some("image/gif"),
746        "bmp" => Some("image/bmp"),
747        _ => None,
748    }
749}
750
751#[async_trait::async_trait]
752impl Storage for SessionStoreV2 {
753    async fn save_session(&self, session: &Session) -> io::Result<()> {
754        let rel_path = self.ensure_session_dirs(session).await?;
755        let abs_dir = self.abs_path_from_rel(&rel_path);
756        let path = abs_dir.join("session.json");
757
758        let tmp = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
759        let bytes =
760            serde_json::to_vec_pretty(session).map_err(|e| other_io_error(e.to_string()))?;
761        fs::write(&tmp, bytes).await?;
762        atomic_rename(&tmp, &path).await?;
763
764        self.upsert_index_from_session(session, rel_path).await?;
765        if let Err(error) = self.search_index.upsert_session(session).await {
766            tracing::warn!(
767                "failed to update session search index for {}: {}",
768                session.id,
769                error
770            );
771        }
772        Ok(())
773    }
774
775    async fn load_session(&self, session_id: &str) -> io::Result<Option<Session>> {
776        validate_session_id(session_id)?;
777        let Some(path) = self.session_json_path(session_id).await? else {
778            return Ok(None);
779        };
780        if !path.exists() {
781            return Ok(None);
782        }
783        let raw = fs::read_to_string(path).await?;
784        let session: Session = serde_json::from_str(&raw)
785            .map_err(|e| other_io_error(format!("invalid session.json: {e}")))?;
786        Ok(Some(session))
787    }
788
789    async fn delete_session(&self, session_id: &str) -> io::Result<bool> {
790        // Historical API deletes sessions. In V2, treat this as recursive and forced.
791        self.delete_session_recursive(session_id, true).await
792    }
793}
794
795#[async_trait::async_trait]
796impl AttachmentReader for SessionStoreV2 {
797    async fn read_attachment(
798        &self,
799        session_id: &str,
800        attachment_id: &str,
801    ) -> io::Result<Option<(Vec<u8>, String)>> {
802        SessionStoreV2::read_attachment(self, session_id, attachment_id).await
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use std::io;
810    use tempfile::TempDir;
811
812    async fn create_temp_storage() -> io::Result<(SessionStoreV2, TempDir)> {
813        let temp_dir = TempDir::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
814        let bamboo_home = temp_dir.path().to_path_buf();
815        let storage = SessionStoreV2::new(bamboo_home).await?;
816        Ok((storage, temp_dir))
817    }
818
819    #[tokio::test]
820    async fn test_new_creates_sessions_directory() -> io::Result<()> {
821        let temp_dir = TempDir::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
822        let bamboo_home = temp_dir.path().to_path_buf();
823        let sessions_dir = bamboo_home.join("sessions");
824
825        assert!(!sessions_dir.exists());
826        let _storage = SessionStoreV2::new(bamboo_home).await?;
827        assert!(sessions_dir.exists());
828
829        Ok(())
830    }
831
832    #[tokio::test]
833    async fn test_new_creates_index_file() -> io::Result<()> {
834        let temp_dir = TempDir::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
835        let bamboo_home = temp_dir.path().to_path_buf();
836        let index_path = bamboo_home.join("sessions.json");
837
838        assert!(!index_path.exists());
839        let _storage = SessionStoreV2::new(bamboo_home).await?;
840        assert!(index_path.exists());
841
842        Ok(())
843    }
844
845    #[tokio::test]
846    async fn test_save_and_load_session() -> io::Result<()> {
847        let (storage, _temp_dir) = create_temp_storage().await?;
848        let session = Session::new("session-1", "test-model");
849
850        storage.save_session(&session).await?;
851        let loaded = storage.load_session(&session.id).await?;
852
853        assert!(loaded.is_some());
854        let loaded = loaded.unwrap();
855        assert_eq!(loaded.id, session.id);
856        assert_eq!(loaded.model, session.model);
857
858        Ok(())
859    }
860
861    #[tokio::test]
862    async fn test_load_session_returns_none_when_not_found() -> io::Result<()> {
863        let (storage, _temp_dir) = create_temp_storage().await?;
864        let loaded = storage.load_session("nonexistent").await?;
865        assert!(loaded.is_none());
866        Ok(())
867    }
868
869    #[tokio::test]
870    async fn test_list_index_entries_empty() -> io::Result<()> {
871        let (storage, _temp_dir) = create_temp_storage().await?;
872        let entries = storage.list_index_entries().await;
873        assert!(entries.is_empty());
874        Ok(())
875    }
876
877    #[tokio::test]
878    async fn test_list_index_entries_with_sessions() -> io::Result<()> {
879        let (storage, _temp_dir) = create_temp_storage().await?;
880
881        let session1 = Session::new("session-1", "model-1");
882        let session2 = Session::new("session-2", "model-2");
883
884        storage.save_session(&session1).await?;
885        storage.save_session(&session2).await?;
886
887        let entries = storage.list_index_entries().await;
888        assert_eq!(entries.len(), 2);
889
890        Ok(())
891    }
892
893    #[tokio::test]
894    async fn test_get_index_entry() -> io::Result<()> {
895        let (storage, _temp_dir) = create_temp_storage().await?;
896        let session = Session::new("session-1", "test-model");
897
898        storage.save_session(&session).await?;
899
900        let entry = storage.get_index_entry(&session.id).await;
901        assert!(entry.is_some());
902        let entry = entry.unwrap();
903        assert_eq!(entry.id, session.id);
904
905        Ok(())
906    }
907
908    #[tokio::test]
909    async fn test_get_index_entry_returns_none_when_not_found() -> io::Result<()> {
910        let (storage, _temp_dir) = create_temp_storage().await?;
911        let entry = storage.get_index_entry("nonexistent").await;
912        assert!(entry.is_none());
913        Ok(())
914    }
915
916    #[tokio::test]
917    async fn test_delete_session() -> io::Result<()> {
918        let (storage, _temp_dir) = create_temp_storage().await?;
919        let session = Session::new("session-1", "test-model");
920
921        storage.save_session(&session).await?;
922        assert!(storage.load_session(&session.id).await?.is_some());
923
924        let deleted = storage.delete_session(&session.id).await?;
925        assert!(deleted);
926        assert!(storage.load_session(&session.id).await?.is_none());
927
928        Ok(())
929    }
930
931    #[tokio::test]
932    async fn test_delete_session_returns_false_when_not_found() -> io::Result<()> {
933        let (storage, _temp_dir) = create_temp_storage().await?;
934        let deleted = storage.delete_session("nonexistent").await?;
935        assert!(!deleted);
936        Ok(())
937    }
938
939    #[test]
940    fn test_validate_session_id_empty() {
941        assert!(validate_session_id("").is_err());
942    }
943
944    #[test]
945    fn test_validate_session_id_with_slash() {
946        assert!(validate_session_id("session/1").is_err());
947    }
948
949    #[test]
950    fn test_validate_session_id_with_backslash() {
951        assert!(validate_session_id("session\\1").is_err());
952    }
953
954    #[test]
955    fn test_validate_session_id_with_double_dot() {
956        assert!(validate_session_id("session..1").is_err());
957    }
958
959    #[test]
960    fn test_validate_session_id_valid() {
961        assert!(validate_session_id("session-123").is_ok());
962    }
963
964    #[test]
965    fn test_root_rel_path() {
966        let path = SessionStoreV2::root_rel_path("session-123");
967        assert_eq!(path, "sessions/session-123");
968    }
969
970    #[test]
971    fn test_child_rel_path() {
972        let path = SessionStoreV2::child_rel_path("root-1", "child-2");
973        assert_eq!(path, "sessions/root-1/children/child-2");
974    }
975
976    #[test]
977    fn test_mime_to_extension() {
978        assert_eq!(mime_to_extension("image/png"), Some("png"));
979        assert_eq!(mime_to_extension("image/jpeg"), Some("jpg"));
980        assert_eq!(mime_to_extension("image/webp"), Some("webp"));
981        assert_eq!(mime_to_extension("image/gif"), Some("gif"));
982        assert_eq!(mime_to_extension("image/bmp"), Some("bmp"));
983        assert_eq!(mime_to_extension("unknown/type"), None);
984    }
985
986    #[test]
987    fn test_extension_to_mime() {
988        assert_eq!(extension_to_mime("png"), Some("image/png"));
989        assert_eq!(extension_to_mime("jpg"), Some("image/jpeg"));
990        assert_eq!(extension_to_mime("jpeg"), Some("image/jpeg"));
991        assert_eq!(extension_to_mime("webp"), Some("image/webp"));
992        assert_eq!(extension_to_mime("gif"), Some("image/gif"));
993        assert_eq!(extension_to_mime("bmp"), Some("image/bmp"));
994        assert_eq!(extension_to_mime("unknown"), None);
995    }
996
997    #[test]
998    fn test_extension_to_mime_case_insensitive() {
999        assert_eq!(extension_to_mime("PNG"), Some("image/png"));
1000        assert_eq!(extension_to_mime("JPG"), Some("image/jpeg"));
1001        assert_eq!(extension_to_mime("JPEG"), Some("image/jpeg"));
1002    }
1003
1004    #[test]
1005    fn test_extension_to_mime_with_whitespace() {
1006        assert_eq!(extension_to_mime("  png  "), Some("image/png"));
1007        assert_eq!(extension_to_mime("\tjpg\t"), Some("image/jpeg"));
1008    }
1009}