Skip to main content

bamboo_storage/
v2.rs

1//! Session storage V2 (folder-per-session + global index).
2//!
3//! Storage layout under `bamboo_home_dir`:
4//! - `sessions.json` (global index, O(1) session_id -> rel_path)
5//! - `sessions/<root_id>/session.json`
6//! - `sessions/<root_id>/children/<child_id>/session.json`
7//! - `.../attachments/` (files; session.json stores references, never base64)
8//!
9//! Notes:
10//! - This is a greenfield format (no migration). Old on-disk layouts are ignored.
11//! - Directory scanning is only used for dev-only index rebuild/recovery (not in hot paths).
12
13use std::collections::{HashMap, HashSet};
14use std::io;
15use std::path::{Path, PathBuf};
16
17use base64::Engine;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use tokio::fs;
21use tokio::sync::{Mutex, RwLock};
22use uuid::Uuid;
23
24use bamboo_domain::ProviderModelRef;
25use bamboo_domain::ReasoningEffort;
26use bamboo_domain::{Role, Session, SessionKind, TokenBudgetUsage};
27
28use crate::search_index::{should_index_session, SessionSearchIndex};
29use bamboo_domain::AttachmentReader;
30use bamboo_domain::Storage;
31
32fn other_io_error(message: impl Into<String>) -> io::Error {
33    io::Error::other(message.into())
34}
35
36/// Filename of the runtime control-plane sidecar, stored alongside
37/// `session.json` in each session directory.
38const RUNTIME_SIDECAR_FILE: &str = "runtime.json";
39
40/// Filename of the append-only per-LLM-call token-usage log, stored alongside
41/// `session.json` in each session directory. One JSON line per call.
42const TOKEN_USAGE_FILE: &str = "token-usage.jsonl";
43
44/// Marker (under `bamboo_home_dir`) recording that the one-shot runtime sidecar
45/// migration has completed, so it is skipped on subsequent boots.
46const RUNTIME_SIDECAR_MIGRATION_MARKER: &str = ".runtime_sidecar_migrated";
47
48/// Build the sidecar snapshot: the full session minus its `messages` history.
49/// Every field except `messages` is authoritative in the sidecar; on load the
50/// message history is taken back from `session.json`.
51fn runtime_sidecar_snapshot(session: &Session) -> Session {
52    let mut snapshot = session.clone();
53    snapshot.messages.clear();
54    snapshot
55}
56
57/// Overlay the runtime sidecar onto the session loaded from `session.json`.
58///
59/// The sidecar holds the freshest control-plane (metadata, `agent_runtime_state`,
60/// title group, …) because every save — full or runtime-only — writes it. The
61/// large `messages` history is only ever written by full saves into
62/// `session.json`, so it is preserved from `main`.
63fn overlay_runtime_sidecar(main: Session, sidecar: Option<Session>) -> Session {
64    match sidecar {
65        Some(mut side) => {
66            side.messages = main.messages;
67            side
68        }
69        None => main,
70    }
71}
72
73fn validate_session_id(session_id: &str) -> io::Result<()> {
74    if session_id.is_empty()
75        || session_id.contains('/')
76        || session_id.contains('\\')
77        || session_id.contains("..")
78    {
79        return Err(other_io_error(format!("invalid session id: {session_id}")));
80    }
81    Ok(())
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct SessionIndexEntry {
86    pub id: String,
87    pub kind: SessionKind,
88    /// Path relative to `bamboo_home_dir` (e.g. "sessions/<id>" or "sessions/<root>/children/<id>").
89    pub rel_path: String,
90    pub title: String,
91    #[serde(default)]
92    pub title_version: u64,
93    pub pinned: bool,
94    pub parent_session_id: Option<String>,
95    pub root_session_id: String,
96    pub spawn_depth: u32,
97    #[serde(default)]
98    pub model: String,
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub model_ref: Option<ProviderModelRef>,
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub reasoning_effort: Option<ReasoningEffort>,
103    /// Raw session-level Gold config JSON mirrored from `session.metadata["gold_config"]`.
104    /// Kept as a string here to avoid making infrastructure depend on bamboo-engine.
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub gold_config_json: Option<String>,
107    /// If the session was created by a schedule, store the schedule id here for fast filtering.
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub created_by_schedule_id: Option<String>,
110    /// If the session was created by a specific schedule run, keep the run id here.
111    #[serde(default, skip_serializing_if = "Option::is_none")]
112    pub schedule_run_id: Option<String>,
113    pub created_at: DateTime<Utc>,
114    pub updated_at: DateTime<Utc>,
115    pub last_activity_at: DateTime<Utc>,
116    pub message_count: usize,
117    pub has_attachments: bool,
118    /// Whether the session currently has a pending question awaiting user response.
119    /// Mirrored into the index from `session.has_pending_question()` so the frontend
120    /// can display the question dialog badge without loading session.json.
121    #[serde(default)]
122    pub has_pending_question: bool,
123    /// Active plan mode runtime state mirrored into the index from
124    /// `session.agent_runtime_state.plan_mode`, so lightweight session-list/detail
125    /// APIs can surface plan mode without loading every session.json.
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub plan_mode: Option<bamboo_domain::PlanModeState>,
128    /// Per-session "bypass permissions" toggle mirrored into the index from
129    /// `session.agent_runtime_state.bypass_permissions`, so the session-list API
130    /// can surface it without loading every session.json.
131    #[serde(default)]
132    pub bypass_permissions: bool,
133    /// Last known run status for this session
134    /// ("pending" | "running" | "completed" | "error" | "cancelled" | "skipped").
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub last_run_status: Option<String>,
137    /// Last known terminal error message, if any.
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    pub last_run_error: Option<String>,
140    /// Last token usage information (updated after each LLM call).
141    ///
142    /// Stored in the global index so the frontend can display token usage without
143    /// loading full session.json for every row.
144    #[serde(default, skip_serializing_if = "Option::is_none")]
145    pub token_usage: Option<TokenBudgetUsage>,
146    /// SubAgent profile id for child sessions spawned by `SubAgent.create`.
147    /// Mirrored into the index from `session.metadata["subagent_type"]` so the
148    /// frontend can render role badges (e.g. "general-purpose", "plan") on the
149    /// child-session list without loading each session.json.
150    /// `None` for root sessions and for legacy children created before this
151    /// field was introduced.
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub subagent_type: Option<String>,
154    /// Child lifecycle: `Some("resident")` for a reusable resident agent (a
155    /// stable session reused for successive tasks); `None`/absent for the
156    /// default one-shot child. Mirrored from `session.metadata["lifecycle"]`.
157    #[serde(default, skip_serializing_if = "Option::is_none")]
158    pub lifecycle: Option<String>,
159    /// For a resident agent, the stable reuse key (scoped to `root_session_id`).
160    /// Mirrored from `session.metadata["resident_name"]`; lets a later
161    /// `SubAgent.create` find and reuse the resident without loading session.json.
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub resident_name: Option<String>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct SessionsIndex {
168    pub version: u32,
169    pub updated_at: DateTime<Utc>,
170    pub sessions: HashMap<String, SessionIndexEntry>,
171}
172
173impl SessionsIndex {
174    fn empty() -> Self {
175        Self {
176            version: 2,
177            updated_at: Utc::now(),
178            sessions: HashMap::new(),
179        }
180    }
181}
182
183#[derive(Debug)]
184pub struct SessionStoreV2 {
185    bamboo_home_dir: PathBuf,
186    sessions_dir: PathBuf,
187    index_path: PathBuf,
188    search_index: SessionSearchIndex,
189    index: RwLock<SessionsIndex>,
190    /// Serializes on-disk index writes (and any multi-step operations that must be atomic-ish).
191    write_lock: Mutex<()>,
192}
193
194impl SessionStoreV2 {
195    pub async fn new(bamboo_home_dir: PathBuf) -> io::Result<Self> {
196        let sessions_dir = bamboo_home_dir.join("sessions");
197        let index_path = bamboo_home_dir.join("sessions.json");
198        let search_index = SessionSearchIndex::new(bamboo_home_dir.join("session_search.db"));
199
200        fs::create_dir_all(&sessions_dir).await?;
201        search_index.init().await?;
202
203        let index = if index_path.exists() {
204            let raw = fs::read_to_string(&index_path).await?;
205            serde_json::from_str(&raw)
206                .map_err(|e| other_io_error(format!("invalid sessions.json: {e}")))?
207        } else {
208            let index = SessionsIndex::empty();
209            // Persist immediately so "index is mandatory" holds from boot.
210            let tmp = index_path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
211            fs::write(
212                &tmp,
213                serde_json::to_vec_pretty(&index).map_err(|e| other_io_error(e.to_string()))?,
214            )
215            .await?;
216            atomic_rename(&tmp, &index_path).await?;
217            index
218        };
219
220        let storage = Self {
221            bamboo_home_dir,
222            sessions_dir,
223            index_path,
224            search_index,
225            index: RwLock::new(index),
226            write_lock: Mutex::new(()),
227        };
228
229        Ok(storage)
230    }
231
232    pub fn search_index(&self) -> &SessionSearchIndex {
233        &self.search_index
234    }
235
236    pub fn bamboo_home_dir(&self) -> &Path {
237        &self.bamboo_home_dir
238    }
239
240    pub fn index_path(&self) -> &Path {
241        &self.index_path
242    }
243
244    pub async fn rebuild_search_index(&self) -> io::Result<()> {
245        let session_ids = {
246            let index = self.index.read().await;
247            index.sessions.keys().cloned().collect::<Vec<_>>()
248        };
249        for session_id in session_ids {
250            if let Some(session) = self.load_session(&session_id).await? {
251                if !should_index_session(session.updated_at) {
252                    continue;
253                }
254                if let Err(error) = self.search_index.upsert_session(&session).await {
255                    tracing::warn!(
256                        "failed to rebuild search index entry for {}: {}",
257                        session_id,
258                        error
259                    );
260                }
261            }
262        }
263        Ok(())
264    }
265
266    pub fn sessions_root_dir(&self) -> &Path {
267        &self.sessions_dir
268    }
269
270    fn root_rel_path(session_id: &str) -> String {
271        format!("sessions/{session_id}")
272    }
273
274    fn child_rel_path(root_id: &str, child_id: &str) -> String {
275        format!("sessions/{root_id}/children/{child_id}")
276    }
277
278    fn abs_path_from_rel(&self, rel: &str) -> PathBuf {
279        self.bamboo_home_dir.join(rel)
280    }
281
282    async fn persist_index_locked(&self, index: &SessionsIndex) -> io::Result<()> {
283        let tmp = self
284            .index_path
285            .with_extension(format!("json.tmp.{}", Uuid::new_v4()));
286        let bytes = serde_json::to_vec_pretty(index).map_err(|e| other_io_error(e.to_string()))?;
287        fs::write(&tmp, bytes).await?;
288        atomic_rename(&tmp, &self.index_path).await?;
289        Ok(())
290    }
291
292    async fn update_index<F, T>(&self, f: F) -> io::Result<T>
293    where
294        F: FnOnce(&mut SessionsIndex) -> io::Result<T>,
295    {
296        let _guard = self.write_lock.lock().await;
297        let mut index = self.index.write().await;
298        let out = f(&mut index)?;
299        index.updated_at = Utc::now();
300        self.persist_index_locked(&index).await?;
301        Ok(out)
302    }
303
304    pub async fn list_index_entries(&self) -> Vec<SessionIndexEntry> {
305        let index = self.index.read().await;
306        let mut items: Vec<_> = index.sessions.values().cloned().collect();
307        items.sort_by_key(|b| std::cmp::Reverse(b.updated_at));
308        items
309    }
310
311    pub async fn get_index_entry(&self, session_id: &str) -> Option<SessionIndexEntry> {
312        let index = self.index.read().await;
313        index.sessions.get(session_id).cloned()
314    }
315
316    pub async fn resolve_rel_path(&self, session_id: &str) -> Option<String> {
317        self.get_index_entry(session_id).await.map(|e| e.rel_path)
318    }
319
320    async fn ensure_session_dirs(&self, session: &Session) -> io::Result<String> {
321        validate_session_id(&session.id)?;
322
323        let rel_path = match session.kind {
324            SessionKind::Root => Self::root_rel_path(&session.id),
325            SessionKind::Child => {
326                let root_id = session.root_session_id.trim();
327                let parent_id = session.parent_session_id.as_deref().unwrap_or("").trim();
328                if root_id.is_empty() || parent_id.is_empty() {
329                    return Err(other_io_error(
330                        "child session missing root_session_id/parent_session_id",
331                    ));
332                }
333                // Nesting is allowed: a child's parent may itself be a child.
334                // All descendants live flat under the tree root's directory
335                // (`child_rel_path` keys on `root_id`, which stays constant for
336                // the whole tree), so depth needs no path change.
337                validate_session_id(root_id)?;
338                Self::child_rel_path(root_id, &session.id)
339            }
340        };
341
342        let abs_dir = self.abs_path_from_rel(&rel_path);
343        fs::create_dir_all(&abs_dir).await?;
344        // Ensure expected subdirs (lazy; cheap).
345        fs::create_dir_all(abs_dir.join("attachments")).await?;
346        if session.kind == SessionKind::Root {
347            fs::create_dir_all(abs_dir.join("children")).await?;
348        }
349        Ok(rel_path)
350    }
351
352    async fn session_json_path(&self, session_id: &str) -> io::Result<Option<PathBuf>> {
353        if let Some(rel) = self.resolve_rel_path(session_id).await {
354            Ok(Some(self.abs_path_from_rel(&rel).join("session.json")))
355        } else {
356            Ok(None)
357        }
358    }
359
360    async fn runtime_json_path(&self, session_id: &str) -> io::Result<Option<PathBuf>> {
361        if let Some(rel) = self.resolve_rel_path(session_id).await {
362            Ok(Some(
363                self.abs_path_from_rel(&rel).join(RUNTIME_SIDECAR_FILE),
364            ))
365        } else {
366            Ok(None)
367        }
368    }
369
370    /// Write the runtime control-plane sidecar: a full session snapshot with the
371    /// (potentially huge) `messages` history cleared. This is what makes
372    /// runtime-only saves O(1) in conversation length.
373    async fn write_runtime_sidecar(&self, abs_dir: &Path, session: &Session) -> io::Result<()> {
374        let path = abs_dir.join(RUNTIME_SIDECAR_FILE);
375        let snapshot = runtime_sidecar_snapshot(session);
376        let tmp = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
377        let bytes =
378            serde_json::to_vec_pretty(&snapshot).map_err(|e| other_io_error(e.to_string()))?;
379        fs::write(&tmp, bytes).await?;
380        atomic_rename(&tmp, &path).await?;
381        Ok(())
382    }
383
384    /// One-shot migration: create the runtime sidecar (`runtime.json`) for every
385    /// existing session that predates the message/control-plane split.
386    ///
387    /// Loading already tolerates a missing sidecar (it falls back to the embedded
388    /// control-plane in `session.json`), so this is an *optimization* migration,
389    /// not a correctness one — but running it once means the fast runtime-save
390    /// path is in effect immediately for legacy sessions, and the denormalized
391    /// `children` id vectors (now `#[serde(skip)]`) drop out of the sidecar.
392    ///
393    /// Idempotent and cheap on later boots: guarded by a marker file, and any
394    /// session that already has a sidecar is skipped. Returns the number of
395    /// sidecars created.
396    pub async fn migrate_runtime_sidecars(&self) -> io::Result<usize> {
397        let marker = self.bamboo_home_dir.join(RUNTIME_SIDECAR_MIGRATION_MARKER);
398        if fs::try_exists(&marker).await.unwrap_or(false) {
399            return Ok(0);
400        }
401
402        let entries = self.list_index_entries().await;
403        let mut migrated = 0usize;
404        for entry in entries {
405            let abs_dir = self.abs_path_from_rel(&entry.rel_path);
406            let sidecar_path = abs_dir.join(RUNTIME_SIDECAR_FILE);
407            if fs::try_exists(&sidecar_path).await.unwrap_or(false) {
408                continue;
409            }
410            let session_path = abs_dir.join("session.json");
411            // Read session.json directly (not load_session) — there is no sidecar
412            // to overlay yet, and we want the raw embedded control-plane.
413            let raw = match fs::read_to_string(&session_path).await {
414                Ok(raw) => raw,
415                Err(error) if error.kind() == io::ErrorKind::NotFound => continue,
416                Err(error) => return Err(error),
417            };
418            let session: Session = match serde_json::from_str(&raw) {
419                Ok(session) => session,
420                Err(error) => {
421                    tracing::warn!(
422                        "runtime sidecar migration: skipping unreadable session {}: {}",
423                        entry.id,
424                        error
425                    );
426                    continue;
427                }
428            };
429            self.write_runtime_sidecar(&abs_dir, &session).await?;
430            migrated += 1;
431        }
432
433        // Persist the marker last, atomically, so an interrupted migration simply
434        // re-runs (it is idempotent) instead of being falsely marked complete.
435        let tmp = marker.with_extension(format!("tmp.{}", Uuid::new_v4()));
436        fs::write(&tmp, b"runtime-sidecar-v1\n").await?;
437        atomic_rename(&tmp, &marker).await?;
438
439        if migrated > 0 {
440            tracing::info!("runtime sidecar migration: created {migrated} sidecar(s)");
441        }
442        Ok(migrated)
443    }
444
445    /// Read the runtime sidecar (a Session snapshot with empty `messages`), if it
446    /// exists. Returns `None` when the session has no sidecar yet (e.g. legacy
447    /// sessions not yet migrated).
448    async fn read_runtime_sidecar(&self, session_id: &str) -> io::Result<Option<Session>> {
449        let Some(path) = self.runtime_json_path(session_id).await? else {
450            return Ok(None);
451        };
452        if !path.exists() {
453            return Ok(None);
454        }
455        let raw = fs::read_to_string(&path).await?;
456        match serde_json::from_str::<Session>(&raw) {
457            Ok(side) => Ok(Some(side)),
458            Err(error) => {
459                // A corrupt sidecar must never make a session unloadable — the
460                // authoritative copy still lives in session.json. Warn and ignore.
461                tracing::warn!(
462                    "ignoring corrupt runtime sidecar for {}: {}",
463                    session_id,
464                    error
465                );
466                Ok(None)
467            }
468        }
469    }
470
471    async fn attachments_dir(&self, session_id: &str) -> io::Result<Option<PathBuf>> {
472        if let Some(rel) = self.resolve_rel_path(session_id).await {
473            Ok(Some(self.abs_path_from_rel(&rel).join("attachments")))
474        } else {
475            Ok(None)
476        }
477    }
478
479    async fn compute_has_attachments(&self, session_id: &str) -> bool {
480        let Ok(Some(dir)) = self.attachments_dir(session_id).await else {
481            return false;
482        };
483        let Ok(mut rd) = fs::read_dir(dir).await else {
484            return false;
485        };
486        rd.next_entry().await.ok().flatten().is_some()
487    }
488
489    async fn upsert_index_from_session(
490        &self,
491        session: &Session,
492        rel_path: String,
493    ) -> io::Result<()> {
494        let has_attachments = self.compute_has_attachments(&session.id).await;
495        // Read the well-known runtime keys via the typed accessors, which prefer
496        // `runtime_metadata` and fall back to the legacy `metadata` strings.
497        let last_run_status = session
498            .last_run_status()
499            .filter(|value| !value.trim().is_empty());
500        let last_run_error = session
501            .last_run_error()
502            .filter(|value| !value.trim().is_empty());
503        let created_by_schedule_id = session
504            .metadata
505            .get("created_by_schedule_id")
506            .cloned()
507            .filter(|v| !v.trim().is_empty());
508        let schedule_run_id = session
509            .metadata
510            .get("schedule_run_id")
511            .cloned()
512            .filter(|v| !v.trim().is_empty());
513        let subagent_type = session.subagent_type().filter(|v| !v.trim().is_empty());
514        let lifecycle = session
515            .metadata
516            .get("lifecycle")
517            .cloned()
518            .filter(|v| !v.trim().is_empty());
519        let resident_name = session
520            .metadata
521            .get("resident_name")
522            .cloned()
523            .filter(|v| !v.trim().is_empty());
524        let gold_config_json = session
525            .metadata
526            .get("gold_config")
527            .cloned()
528            .filter(|v| !v.trim().is_empty());
529        let plan_mode = session
530            .agent_runtime_state
531            .as_ref()
532            .and_then(|state| state.plan_mode.clone());
533        let bypass_permissions = session
534            .agent_runtime_state
535            .as_ref()
536            .is_some_and(|state| state.bypass_permissions);
537        self.update_index(|index| {
538            index.sessions.insert(
539                session.id.clone(),
540                SessionIndexEntry {
541                    id: session.id.clone(),
542                    kind: session.kind,
543                    rel_path,
544                    title: session.title.clone(),
545                    title_version: session.title_version,
546                    pinned: session.pinned,
547                    parent_session_id: session.parent_session_id.clone(),
548                    root_session_id: session.root_session_id.clone(),
549                    spawn_depth: session.spawn_depth,
550                    model: session.model.clone(),
551                    model_ref: session.model_ref.clone(),
552                    reasoning_effort: session.reasoning_effort,
553                    gold_config_json,
554                    created_by_schedule_id,
555                    schedule_run_id,
556                    created_at: session.created_at,
557                    updated_at: session.updated_at,
558                    last_activity_at: session.updated_at,
559                    message_count: session.messages.len(),
560                    has_attachments,
561                    has_pending_question: session.has_pending_question(),
562                    plan_mode,
563                    bypass_permissions,
564                    last_run_status,
565                    last_run_error,
566                    token_usage: session.token_usage.clone(),
567                    subagent_type,
568                    lifecycle,
569                    resident_name,
570                },
571            );
572            Ok(())
573        })
574        .await?;
575        Ok(())
576    }
577
578    pub async fn write_image_attachment(
579        &self,
580        session: &Session,
581        raw_base64_or_data_url: &str,
582        mime_hint: Option<&str>,
583    ) -> io::Result<(String, String)> {
584        let (mime, base64_data) =
585            parse_data_url_base64(raw_base64_or_data_url).unwrap_or_else(|| {
586                (
587                    mime_hint.unwrap_or("image/png").trim().to_string(),
588                    raw_base64_or_data_url.trim().to_string(),
589                )
590            });
591
592        let bytes = base64::engine::general_purpose::STANDARD
593            .decode(base64_data.as_bytes())
594            .map_err(|e| other_io_error(format!("invalid base64 image data: {e}")))?;
595
596        let attachment_id = Uuid::new_v4().to_string();
597        let ext = mime_to_extension(mime.as_str()).unwrap_or("bin");
598
599        let rel_path = self.ensure_session_dirs(session).await?;
600        let abs_dir = self.abs_path_from_rel(&rel_path);
601        let attachments_dir = abs_dir.join("attachments");
602        fs::create_dir_all(&attachments_dir).await?;
603
604        let path = attachments_dir.join(format!("{attachment_id}.{ext}"));
605        let tmp = path.with_extension(format!("{ext}.tmp.{}", Uuid::new_v4()));
606        fs::write(&tmp, &bytes).await?;
607        atomic_rename(&tmp, &path).await?;
608
609        Ok((
610            attachment_id.clone(),
611            format!("bamboo-attachment://{}/{}", session.id, attachment_id),
612        ))
613    }
614
615    /// Read an attachment by id, returning bytes + inferred MIME.
616    pub async fn read_attachment(
617        &self,
618        session_id: &str,
619        attachment_id: &str,
620    ) -> io::Result<Option<(Vec<u8>, String)>> {
621        validate_session_id(session_id)?;
622        validate_session_id(attachment_id)?;
623        let Some(dir) = self.attachments_dir(session_id).await? else {
624            return Ok(None);
625        };
626        if !dir.exists() {
627            return Ok(None);
628        }
629
630        let mut rd = fs::read_dir(&dir).await?;
631        while let Some(entry) = rd.next_entry().await? {
632            let file_name = entry.file_name();
633            let file_name = file_name.to_string_lossy();
634            if !file_name.starts_with(attachment_id) {
635                continue;
636            }
637            // Match "<id>.<ext>"
638            if file_name.len() <= attachment_id.len() + 1
639                || !file_name.as_bytes()[attachment_id.len()].eq(&b'.')
640            {
641                continue;
642            }
643            let ext = file_name.split('.').next_back().unwrap_or("bin");
644            let mime = extension_to_mime(ext).unwrap_or("application/octet-stream");
645            let bytes = fs::read(entry.path()).await?;
646            return Ok(Some((bytes, mime.to_string())));
647        }
648
649        Ok(None)
650    }
651
652    pub async fn clear_session(&self, session_id: &str) -> io::Result<bool> {
653        let Some(mut session) = self.load_session(session_id).await? else {
654            return Ok(false);
655        };
656
657        // Keep only the first System message if present; drop all other messages.
658        let system_msg = session
659            .messages
660            .iter()
661            .find(|m| matches!(m.role, Role::System))
662            .cloned();
663        session.messages.clear();
664        if let Some(system) = system_msg {
665            session.messages.push(system);
666        }
667
668        // Clearing history invalidates derived context state.
669        session.token_usage = None;
670        session.conversation_summary = None;
671        session.updated_at = Utc::now();
672
673        // Remove attachments on disk.
674        if let Ok(Some(dir)) = self.attachments_dir(session_id).await {
675            let _ = fs::remove_dir_all(&dir).await;
676            let _ = fs::create_dir_all(&dir).await;
677        }
678
679        self.save_session(&session).await?;
680        Ok(true)
681    }
682
683    pub async fn cleanup(&self, mode: CleanupMode, keep_pinned: bool) -> io::Result<CleanupResult> {
684        // All decisions are index-only.
685        let entries = {
686            self.index
687                .read()
688                .await
689                .sessions
690                .values()
691                .cloned()
692                .collect::<Vec<_>>()
693        };
694
695        let pinned_child_roots: HashSet<String> = if keep_pinned {
696            entries
697                .iter()
698                .filter(|e| e.kind == SessionKind::Child && e.pinned)
699                .filter_map(|e| e.parent_session_id.clone())
700                .collect()
701        } else {
702            HashSet::new()
703        };
704
705        // Helper to decide whether an entry is protected.
706        let is_protected = |e: &SessionIndexEntry| -> bool {
707            if !keep_pinned {
708                return false;
709            }
710            if e.pinned {
711                return true;
712            }
713            // A root with pinned child cannot be deleted.
714            if e.kind == SessionKind::Root && pinned_child_roots.contains(&e.id) {
715                return true;
716            }
717            false
718        };
719
720        // Determine deletions as a set of session ids (roots and/or children).
721        let mut delete_child_ids = HashSet::<String>::new();
722        let mut delete_root_ids = HashSet::<String>::new();
723
724        match mode {
725            CleanupMode::Children => {
726                for e in entries.iter().filter(|e| e.kind == SessionKind::Child) {
727                    if is_protected(e) {
728                        continue;
729                    }
730                    delete_child_ids.insert(e.id.clone());
731                }
732            }
733            CleanupMode::All | CleanupMode::Empty => {
734                // First decide which roots can be deleted.
735                for root in entries.iter().filter(|e| e.kind == SessionKind::Root) {
736                    if is_protected(root) {
737                        continue;
738                    }
739                    if mode == CleanupMode::Empty && root.message_count > 1 {
740                        continue;
741                    }
742                    delete_root_ids.insert(root.id.clone());
743                }
744
745                // For roots we keep, we may still delete some children (e.g., unpinned, or empty).
746                for child in entries.iter().filter(|e| e.kind == SessionKind::Child) {
747                    if delete_root_ids.contains(&child.root_session_id) {
748                        continue; // will be deleted with root.
749                    }
750                    if is_protected(child) {
751                        continue;
752                    }
753                    if mode == CleanupMode::Empty && child.message_count > 1 {
754                        continue;
755                    }
756                    delete_child_ids.insert(child.id.clone());
757                }
758            }
759        }
760
761        // Pre-compute full deleted id set for a truthful response payload.
762        let mut deleted_ids = HashSet::<String>::new();
763        for root_id in delete_root_ids.iter() {
764            for e in entries.iter().filter(|e| e.root_session_id == *root_id) {
765                deleted_ids.insert(e.id.clone());
766            }
767        }
768        for child_id in delete_child_ids.iter() {
769            deleted_ids.insert(child_id.clone());
770        }
771
772        // Apply deletions (roots first; they delete children implicitly).
773        for root_id in delete_root_ids.iter() {
774            let _ = self.delete_session_recursive(root_id, true).await?;
775        }
776        for child_id in delete_child_ids.iter() {
777            let _ = self.delete_session_recursive(child_id, true).await?;
778        }
779        let mut deleted_session_ids: Vec<String> = deleted_ids.into_iter().collect();
780        deleted_session_ids.sort();
781        Ok(CleanupResult {
782            deleted_count: deleted_session_ids.len(),
783            deleted_session_ids,
784        })
785    }
786
787    /// Development-only: hard reset all sessions and the index.
788    ///
789    /// This is the supported "greenfield" mechanism. It deletes:
790    /// - `bamboo_home_dir/sessions/`
791    /// - `bamboo_home_dir/sessions.json` (rewritten to empty index)
792    pub async fn dev_reset(&self) -> io::Result<()> {
793        let _guard = self.write_lock.lock().await;
794
795        // Remove the sessions directory entirely.
796        let _ = fs::remove_dir_all(&self.sessions_dir).await;
797        fs::create_dir_all(&self.sessions_dir).await?;
798
799        // Reset in-memory index and persist.
800        {
801            let mut index = self.index.write().await;
802            *index = SessionsIndex::empty();
803            self.persist_index_locked(&index).await?;
804        }
805
806        Ok(())
807    }
808
809    /// Delete a session. If the session is a root, deletes its entire directory (and all child sessions).
810    /// If the session is a child, deletes only that child directory.
811    ///
812    /// `force=true` ignores pinned protection; callers must enforce confirmations at the API/UI layer.
813    pub async fn delete_session_recursive(
814        &self,
815        session_id: &str,
816        force: bool,
817    ) -> io::Result<bool> {
818        let entry = self.get_index_entry(session_id).await;
819        let Some(entry) = entry else {
820            return Ok(false);
821        };
822
823        if !force && entry.pinned {
824            return Err(other_io_error(
825                "refusing to delete pinned session without force",
826            ));
827        }
828
829        match entry.kind {
830            SessionKind::Child => {
831                let abs_dir = self.abs_path_from_rel(&entry.rel_path);
832                let _ = fs::remove_dir_all(&abs_dir).await;
833                self.update_index(|index| {
834                    index.sessions.remove(session_id);
835                    Ok(())
836                })
837                .await?;
838                if let Err(error) = self.search_index.delete_session(session_id).await {
839                    tracing::warn!(
840                        "failed to delete session search index row for {}: {}",
841                        session_id,
842                        error
843                    );
844                }
845                Ok(true)
846            }
847            SessionKind::Root => {
848                let root_id = entry.id.clone();
849                let abs_dir = self.abs_path_from_rel(&entry.rel_path);
850                let _ = fs::remove_dir_all(&abs_dir).await;
851
852                let to_remove_ids = {
853                    let index = self.index.read().await;
854                    index
855                        .sessions
856                        .values()
857                        .filter(|e| e.root_session_id == root_id)
858                        .map(|e| e.id.clone())
859                        .collect::<Vec<_>>()
860                };
861
862                self.update_index(|index| {
863                    for id in &to_remove_ids {
864                        index.sessions.remove(id);
865                    }
866                    Ok(())
867                })
868                .await?;
869
870                for id in to_remove_ids {
871                    if let Err(error) = self.search_index.delete_session(&id).await {
872                        tracing::warn!(
873                            "failed to delete session search index row for {}: {}",
874                            id,
875                            error
876                        );
877                    }
878                }
879                Ok(true)
880            }
881        }
882    }
883}
884
885#[derive(Debug, Clone, Copy, PartialEq, Eq)]
886pub enum CleanupMode {
887    All,
888    Empty,
889    Children,
890}
891
892#[derive(Debug, Clone, Serialize, Deserialize)]
893pub struct CleanupResult {
894    pub deleted_count: usize,
895    pub deleted_session_ids: Vec<String>,
896}
897
898async fn atomic_rename(from: &Path, to: &Path) -> io::Result<()> {
899    // Best-effort atomic on Unix. On Windows, rename cannot overwrite.
900    match fs::rename(from, to).await {
901        Ok(()) => Ok(()),
902        Err(err) => {
903            if to.exists() {
904                let _ = fs::remove_file(to).await;
905            }
906            fs::rename(from, to).await.map_err(|e| {
907                other_io_error(format!(
908                    "failed to rename {:?} -> {:?}: {} (original: {})",
909                    from, to, e, err
910                ))
911            })
912        }
913    }
914}
915
916fn parse_data_url_base64(url: &str) -> Option<(String, String)> {
917    // data:<mime>;base64,<data...>
918    let trimmed = url.trim();
919    if !trimmed.starts_with("data:") {
920        return None;
921    }
922    let trimmed = trimmed.strip_prefix("data:")?;
923    let (header, data) = trimmed.split_once(',')?;
924    if !header.contains(";base64") {
925        return None;
926    }
927    let mime = header.split(';').next()?.trim().to_string();
928    Some((mime, data.trim().to_string()))
929}
930
931fn mime_to_extension(mime: &str) -> Option<&'static str> {
932    match mime.trim().to_ascii_lowercase().as_str() {
933        "image/png" => Some("png"),
934        "image/jpeg" => Some("jpg"),
935        "image/webp" => Some("webp"),
936        "image/gif" => Some("gif"),
937        "image/bmp" => Some("bmp"),
938        _ => None,
939    }
940}
941
942fn extension_to_mime(ext: &str) -> Option<&'static str> {
943    match ext.trim().to_ascii_lowercase().as_str() {
944        "png" => Some("image/png"),
945        "jpg" | "jpeg" => Some("image/jpeg"),
946        "webp" => Some("image/webp"),
947        "gif" => Some("image/gif"),
948        "bmp" => Some("image/bmp"),
949        _ => None,
950    }
951}
952
953#[async_trait::async_trait]
954impl Storage for SessionStoreV2 {
955    async fn save_session(&self, session: &Session) -> io::Result<()> {
956        let rel_path = self.ensure_session_dirs(session).await?;
957        let abs_dir = self.abs_path_from_rel(&rel_path);
958        let path = abs_dir.join("session.json");
959
960        // Refresh the runtime sidecar BEFORE session.json. If the process
961        // crashes between the two writes, the sidecar then carries a
962        // control-plane that is at least as fresh as session.json, and the
963        // load-time overlay (sidecar wins for non-message fields) stays correct.
964        // Writing session.json first could leave a stale sidecar that silently
965        // reverts the just-saved control-plane on the next load.
966        self.write_runtime_sidecar(&abs_dir, session).await?;
967
968        let tmp = path.with_extension(format!("json.tmp.{}", Uuid::new_v4()));
969        let bytes =
970            serde_json::to_vec_pretty(session).map_err(|e| other_io_error(e.to_string()))?;
971        fs::write(&tmp, bytes).await?;
972        atomic_rename(&tmp, &path).await?;
973
974        self.upsert_index_from_session(session, rel_path).await?;
975        if let Err(error) = self.search_index.upsert_session(session).await {
976            tracing::warn!(
977                "failed to update session search index for {}: {}",
978                session.id,
979                error
980            );
981        }
982        Ok(())
983    }
984
985    async fn load_session(&self, session_id: &str) -> io::Result<Option<Session>> {
986        validate_session_id(session_id)?;
987        let Some(path) = self.session_json_path(session_id).await? else {
988            return Ok(None);
989        };
990        if !path.exists() {
991            return Ok(None);
992        }
993        let raw = fs::read_to_string(path).await?;
994        let session: Session = serde_json::from_str(&raw)
995            .map_err(|e| other_io_error(format!("invalid session.json: {e}")))?;
996        let sidecar = self.read_runtime_sidecar(session_id).await?;
997        Ok(Some(overlay_runtime_sidecar(session, sidecar)))
998    }
999
1000    async fn delete_session(&self, session_id: &str) -> io::Result<bool> {
1001        // Historical API deletes sessions. In V2, treat this as recursive and forced.
1002        self.delete_session_recursive(session_id, true).await
1003    }
1004
1005    async fn save_runtime_state(&self, session: &Session) -> io::Result<()> {
1006        // Fast path: write ONLY the small runtime sidecar (no messages), leaving
1007        // session.json — which carries the full conversation history — untouched.
1008        // This is O(1) in conversation length, unlike `save_session`.
1009        let Some(rel) = self.resolve_rel_path(&session.id).await else {
1010            // Session was never fully persisted yet — fall back to a full save so
1011            // session.json and the index get created.
1012            return self.save_session(session).await;
1013        };
1014        let abs_dir = self.abs_path_from_rel(&rel);
1015        self.write_runtime_sidecar(&abs_dir, session).await
1016    }
1017
1018    async fn load_runtime_control_plane(&self, session_id: &str) -> io::Result<Option<Session>> {
1019        validate_session_id(session_id)?;
1020        // Prefer the sidecar (cheap: no messages). Fall back to a full load for
1021        // sessions that predate the sidecar (not yet migrated).
1022        if let Some(side) = self.read_runtime_sidecar(session_id).await? {
1023            return Ok(Some(side));
1024        }
1025        self.load_session(session_id).await
1026    }
1027
1028    async fn list_child_run_statuses(
1029        &self,
1030        parent_session_id: &str,
1031    ) -> io::Result<Vec<(String, Option<String>)>> {
1032        let index = self.index.read().await;
1033        Ok(index
1034            .sessions
1035            .values()
1036            .filter(|entry| {
1037                entry.kind == SessionKind::Child
1038                    && entry.parent_session_id.as_deref() == Some(parent_session_id)
1039            })
1040            .map(|entry| (entry.id.clone(), entry.last_run_status.clone()))
1041            .collect())
1042    }
1043
1044    async fn append_token_usage_record(&self, session_id: &str, json_line: &str) -> io::Result<()> {
1045        use tokio::io::AsyncWriteExt;
1046
1047        validate_session_id(session_id)?;
1048        // Resolve the session's own directory. If it isn't indexed yet (no
1049        // initial save has happened), skip silently — this is an analysis
1050        // sidecar, never authoritative state.
1051        let Some(rel) = self.resolve_rel_path(session_id).await else {
1052            return Ok(());
1053        };
1054        let path = self.abs_path_from_rel(&rel).join(TOKEN_USAGE_FILE);
1055
1056        // Exactly one line per record, regardless of how the caller framed it.
1057        let mut line = json_line.trim_end_matches('\n').to_string();
1058        line.push('\n');
1059
1060        let mut file = fs::OpenOptions::new()
1061            .create(true)
1062            .append(true)
1063            .open(&path)
1064            .await?;
1065        file.write_all(line.as_bytes()).await?;
1066        Ok(())
1067    }
1068}
1069
1070#[async_trait::async_trait]
1071impl AttachmentReader for SessionStoreV2 {
1072    async fn read_attachment(
1073        &self,
1074        session_id: &str,
1075        attachment_id: &str,
1076    ) -> io::Result<Option<(Vec<u8>, String)>> {
1077        SessionStoreV2::read_attachment(self, session_id, attachment_id).await
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::*;
1084    use std::io;
1085    use tempfile::TempDir;
1086
1087    async fn create_temp_storage() -> io::Result<(SessionStoreV2, TempDir)> {
1088        let temp_dir = TempDir::new().map_err(io::Error::other)?;
1089        let bamboo_home = temp_dir.path().to_path_buf();
1090        let storage = SessionStoreV2::new(bamboo_home).await?;
1091        Ok((storage, temp_dir))
1092    }
1093
1094    #[tokio::test]
1095    async fn test_new_creates_sessions_directory() -> io::Result<()> {
1096        let temp_dir = TempDir::new().map_err(io::Error::other)?;
1097        let bamboo_home = temp_dir.path().to_path_buf();
1098        let sessions_dir = bamboo_home.join("sessions");
1099
1100        assert!(!sessions_dir.exists());
1101        let _storage = SessionStoreV2::new(bamboo_home).await?;
1102        assert!(sessions_dir.exists());
1103
1104        Ok(())
1105    }
1106
1107    #[tokio::test]
1108    async fn test_new_creates_index_file() -> io::Result<()> {
1109        let temp_dir = TempDir::new().map_err(io::Error::other)?;
1110        let bamboo_home = temp_dir.path().to_path_buf();
1111        let index_path = bamboo_home.join("sessions.json");
1112
1113        assert!(!index_path.exists());
1114        let _storage = SessionStoreV2::new(bamboo_home).await?;
1115        assert!(index_path.exists());
1116
1117        Ok(())
1118    }
1119
1120    // ── Runtime sidecar (③) ───────────────────────────────────────────────
1121
1122    use bamboo_domain::session::types::Message;
1123    use bamboo_domain::AgentRuntimeState;
1124
1125    fn session_with_history(id: &str, messages: usize, run_id: &str) -> Session {
1126        let mut s = Session::new(id.to_string(), "test-model".to_string());
1127        for i in 0..messages {
1128            s.add_message(Message::user(format!("msg-{i}")));
1129        }
1130        s.agent_runtime_state = Some(AgentRuntimeState::new(run_id));
1131        s
1132    }
1133
1134    async fn read_session_json_raw(storage: &SessionStoreV2, id: &str) -> String {
1135        let path = storage.session_json_path(id).await.unwrap().unwrap();
1136        tokio::fs::read_to_string(path).await.unwrap()
1137    }
1138
1139    #[tokio::test]
1140    async fn append_token_usage_record_writes_jsonl_in_session_dir() -> io::Result<()> {
1141        let (storage, _t) = create_temp_storage().await?;
1142        let s = session_with_history("tu-1", 1, "run-A");
1143        storage.save_session(&s).await?;
1144
1145        storage
1146            .append_token_usage_record("tu-1", r#"{"round":1,"cache_read_input_tokens":0}"#)
1147            .await?;
1148        // A trailing newline in the caller's line must not produce a blank line.
1149        storage
1150            .append_token_usage_record("tu-1", "{\"round\":2,\"cache_read_input_tokens\":9000}\n")
1151            .await?;
1152
1153        let rel = storage.resolve_rel_path("tu-1").await.unwrap();
1154        let path = storage.abs_path_from_rel(&rel).join(TOKEN_USAGE_FILE);
1155        assert!(
1156            path.exists(),
1157            "token-usage.jsonl should sit in the session dir"
1158        );
1159
1160        let contents = tokio::fs::read_to_string(&path).await?;
1161        let lines: Vec<&str> = contents.lines().collect();
1162        assert_eq!(lines.len(), 2, "one line per appended record");
1163        assert!(lines[0].contains("\"round\":1"));
1164        assert!(lines[1].contains("\"round\":2"));
1165        // Each line is valid standalone JSON.
1166        for line in lines {
1167            serde_json::from_str::<serde_json::Value>(line).expect("each line is valid JSON");
1168        }
1169        Ok(())
1170    }
1171
1172    #[tokio::test]
1173    async fn append_token_usage_record_is_noop_for_unindexed_session() -> io::Result<()> {
1174        let (storage, _t) = create_temp_storage().await?;
1175        // No save_session → not indexed yet. Must not error, must not create a file.
1176        storage
1177            .append_token_usage_record("never-saved", r#"{"round":1}"#)
1178            .await?;
1179        assert!(storage.resolve_rel_path("never-saved").await.is_none());
1180        Ok(())
1181    }
1182
1183    #[tokio::test]
1184    async fn save_session_writes_runtime_sidecar() -> io::Result<()> {
1185        let (storage, _t) = create_temp_storage().await?;
1186        let s = session_with_history("sc-1", 2, "run-A");
1187        storage.save_session(&s).await?;
1188
1189        let sidecar_path = storage.runtime_json_path("sc-1").await?.unwrap();
1190        assert!(
1191            sidecar_path.exists(),
1192            "save_session must write runtime.json"
1193        );
1194
1195        // Sidecar must NOT carry the message history.
1196        let side = storage.read_runtime_sidecar("sc-1").await?.unwrap();
1197        assert!(side.messages.is_empty(), "sidecar messages must be cleared");
1198        assert_eq!(side.agent_runtime_state.as_ref().unwrap().run_id, "run-A");
1199        Ok(())
1200    }
1201
1202    #[tokio::test]
1203    async fn save_runtime_state_does_not_rewrite_session_json_messages() -> io::Result<()> {
1204        let (storage, _t) = create_temp_storage().await?;
1205
1206        // Full save: 3 messages + run-A.
1207        let s = session_with_history("sc-2", 3, "run-A");
1208        storage.save_session(&s).await?;
1209        let raw_before = read_session_json_raw(&storage, "sc-2").await;
1210        assert!(raw_before.contains("msg-2"));
1211
1212        // Runtime-only save: bump control-plane to run-B AND (deviously) add a
1213        // 4th in-memory message. The sidecar must persist run-B but IGNORE the
1214        // message, and session.json must be left byte-identical.
1215        let mut s2 = s.clone();
1216        s2.agent_runtime_state = Some(AgentRuntimeState::new("run-B"));
1217        s2.add_message(Message::user("msg-3-should-not-persist"));
1218        storage.save_runtime_state(&s2).await?;
1219
1220        let raw_after = read_session_json_raw(&storage, "sc-2").await;
1221        assert_eq!(
1222            raw_before, raw_after,
1223            "save_runtime_state must not touch session.json"
1224        );
1225
1226        // Load overlays the sidecar: run-B control-plane + original 3 messages.
1227        let loaded = storage.load_session("sc-2").await?.unwrap();
1228        assert_eq!(loaded.agent_runtime_state.as_ref().unwrap().run_id, "run-B");
1229        assert_eq!(
1230            loaded.messages.len(),
1231            3,
1232            "runtime-only save must not add a message"
1233        );
1234        Ok(())
1235    }
1236
1237    #[tokio::test]
1238    async fn save_runtime_state_falls_back_to_full_save_when_unpersisted() -> io::Result<()> {
1239        let (storage, _t) = create_temp_storage().await?;
1240        // Session was never saved: no index entry, no dir. save_runtime_state
1241        // must fall back to a full save so session.json + index get created.
1242        let s = session_with_history("sc-3", 1, "run-A");
1243        storage.save_runtime_state(&s).await?;
1244
1245        let loaded = storage.load_session("sc-3").await?;
1246        assert!(
1247            loaded.is_some(),
1248            "fallback full save must create the session"
1249        );
1250        assert_eq!(loaded.unwrap().messages.len(), 1);
1251        Ok(())
1252    }
1253
1254    #[tokio::test]
1255    async fn corrupt_sidecar_is_ignored_and_session_still_loads() -> io::Result<()> {
1256        let (storage, _t) = create_temp_storage().await?;
1257        let s = session_with_history("sc-4", 2, "run-A");
1258        storage.save_session(&s).await?;
1259
1260        // Corrupt the sidecar.
1261        let sidecar_path = storage.runtime_json_path("sc-4").await?.unwrap();
1262        tokio::fs::write(&sidecar_path, b"{ not valid json").await?;
1263
1264        // Session still loads from session.json; corrupt sidecar is ignored.
1265        let loaded = storage.load_session("sc-4").await?.unwrap();
1266        assert_eq!(loaded.messages.len(), 2);
1267        assert_eq!(loaded.agent_runtime_state.as_ref().unwrap().run_id, "run-A");
1268        Ok(())
1269    }
1270
1271    // ── ⑤ Runtime sidecar migration ──────────────────────────────────────
1272
1273    #[tokio::test]
1274    async fn migration_backfills_sidecars_for_legacy_sessions() -> io::Result<()> {
1275        let temp_dir = TempDir::new().map_err(io::Error::other)?;
1276        let bamboo_home = temp_dir.path().to_path_buf();
1277        let storage = SessionStoreV2::new(bamboo_home.clone()).await?;
1278
1279        // Persist two sessions, then delete their sidecars to simulate the
1280        // legacy on-disk layout (session.json only).
1281        let a = session_with_history("mig-a", 3, "run-A");
1282        let b = session_with_history("mig-b", 1, "run-B");
1283        storage.save_session(&a).await?;
1284        storage.save_session(&b).await?;
1285        for id in ["mig-a", "mig-b"] {
1286            let sidecar = storage.runtime_json_path(id).await?.unwrap();
1287            tokio::fs::remove_file(&sidecar).await?;
1288            assert!(!sidecar.exists());
1289        }
1290
1291        let migrated = storage.migrate_runtime_sidecars().await?;
1292        assert_eq!(migrated, 2, "both legacy sessions get a sidecar");
1293
1294        // Sidecars now exist and carry the control-plane (no messages).
1295        for (id, run) in [("mig-a", "run-A"), ("mig-b", "run-B")] {
1296            let side = storage.read_runtime_sidecar(id).await?.unwrap();
1297            assert!(side.messages.is_empty());
1298            assert_eq!(side.agent_runtime_state.as_ref().unwrap().run_id, run);
1299        }
1300        // Full load still returns the messages from session.json.
1301        assert_eq!(
1302            storage.load_session("mig-a").await?.unwrap().messages.len(),
1303            3
1304        );
1305
1306        // Marker written; a second run is a no-op.
1307        let marker = bamboo_home.join(RUNTIME_SIDECAR_MIGRATION_MARKER);
1308        assert!(marker.exists());
1309        assert_eq!(storage.migrate_runtime_sidecars().await?, 0);
1310        Ok(())
1311    }
1312
1313    #[tokio::test]
1314    async fn migration_is_idempotent_and_skips_existing_sidecars() -> io::Result<()> {
1315        let (storage, _t) = create_temp_storage().await?;
1316        // Fresh save already writes a sidecar — migration must not double-count it.
1317        storage
1318            .save_session(&session_with_history("mig-c", 2, "run-C"))
1319            .await?;
1320        let first = storage.migrate_runtime_sidecars().await?;
1321        assert_eq!(first, 0, "session saved in new format needs no migration");
1322        // And a re-run remains a no-op.
1323        assert_eq!(storage.migrate_runtime_sidecars().await?, 0);
1324        Ok(())
1325    }
1326
1327    #[tokio::test]
1328    async fn migration_drops_legacy_denormalized_children_from_sidecar() -> io::Result<()> {
1329        // A legacy session.json whose embedded runtime state still carries the
1330        // old denormalized children id vectors. After migration the sidecar must
1331        // not contain them (they are now derived from the index).
1332        let (storage, _t) = create_temp_storage().await?;
1333        let mut s = session_with_history("mig-legacy", 1, "run-L");
1334        storage.save_session(&s).await?;
1335
1336        // Hand-write a legacy session.json containing children.active_ids and
1337        // remove the sidecar, simulating pre-split on-disk data.
1338        let dir = storage.abs_path_from_rel(&storage.resolve_rel_path("mig-legacy").await.unwrap());
1339        s.agent_runtime_state = Some(AgentRuntimeState::new("run-L"));
1340        let mut value = serde_json::to_value(&s).unwrap();
1341        value["agent_runtime_state"]["children"]["active_ids"] = serde_json::json!(["ghost-child"]);
1342        tokio::fs::write(
1343            dir.join("session.json"),
1344            serde_json::to_vec_pretty(&value).unwrap(),
1345        )
1346        .await?;
1347        tokio::fs::remove_file(storage.runtime_json_path("mig-legacy").await?.unwrap()).await?;
1348
1349        assert_eq!(storage.migrate_runtime_sidecars().await?, 1);
1350
1351        let raw_sidecar =
1352            tokio::fs::read_to_string(storage.runtime_json_path("mig-legacy").await?.unwrap())
1353                .await?;
1354        assert!(
1355            !raw_sidecar.contains("ghost-child") && !raw_sidecar.contains("active_ids"),
1356            "legacy denormalized children must not survive migration: {raw_sidecar}"
1357        );
1358        Ok(())
1359    }
1360
1361    #[tokio::test]
1362    async fn list_child_run_statuses_filters_by_parent_and_reports_status() -> io::Result<()> {
1363        let (storage, _t) = create_temp_storage().await?;
1364
1365        // Parent root + two children with distinct statuses, plus an unrelated
1366        // child under a different parent that must NOT appear.
1367        let parent = Session::new("p-root".to_string(), "m".to_string());
1368        storage.save_session(&parent).await?;
1369        let other = Session::new("p-other".to_string(), "m".to_string());
1370        storage.save_session(&other).await?;
1371
1372        let mut c1 = Session::new_child("ch-done", "p-root", "m", "c1");
1373        c1.metadata
1374            .insert("last_run_status".to_string(), "completed".to_string());
1375        storage.save_session(&c1).await?;
1376
1377        let c2 = Session::new_child("ch-pending", "p-root", "m", "c2");
1378        storage.save_session(&c2).await?;
1379
1380        let foreign = Session::new_child("ch-foreign", "p-other", "m", "x");
1381        storage.save_session(&foreign).await?;
1382
1383        let mut got = storage.list_child_run_statuses("p-root").await?;
1384        got.sort_by(|a, b| a.0.cmp(&b.0));
1385        assert_eq!(got.len(), 2, "only p-root's children: {got:?}");
1386        assert_eq!(got[0].0, "ch-done");
1387        assert_eq!(got[0].1.as_deref(), Some("completed"));
1388        assert_eq!(got[1].0, "ch-pending");
1389        // pending child has no terminal status mirrored yet.
1390        assert!(got[1].1.as_deref() != Some("completed"));
1391        Ok(())
1392    }
1393
1394    #[tokio::test]
1395    async fn load_runtime_control_plane_reads_sidecar_without_messages() -> io::Result<()> {
1396        let (storage, _t) = create_temp_storage().await?;
1397        let s = session_with_history("sc-5", 5, "run-A");
1398        storage.save_session(&s).await?;
1399
1400        let cp = storage.load_runtime_control_plane("sc-5").await?.unwrap();
1401        assert!(
1402            cp.messages.is_empty(),
1403            "control-plane load must skip the message history"
1404        );
1405        assert_eq!(cp.agent_runtime_state.as_ref().unwrap().run_id, "run-A");
1406        Ok(())
1407    }
1408
1409    #[tokio::test]
1410    async fn test_save_and_load_session() -> io::Result<()> {
1411        let (storage, _temp_dir) = create_temp_storage().await?;
1412        let session = Session::new("session-1", "test-model");
1413
1414        storage.save_session(&session).await?;
1415        let loaded = storage.load_session(&session.id).await?;
1416
1417        assert!(loaded.is_some());
1418        let loaded = loaded.unwrap();
1419        assert_eq!(loaded.id, session.id);
1420        assert_eq!(loaded.model, session.model);
1421
1422        Ok(())
1423    }
1424
1425    #[tokio::test]
1426    async fn test_load_session_returns_none_when_not_found() -> io::Result<()> {
1427        let (storage, _temp_dir) = create_temp_storage().await?;
1428        let loaded = storage.load_session("nonexistent").await?;
1429        assert!(loaded.is_none());
1430        Ok(())
1431    }
1432
1433    #[tokio::test]
1434    async fn nested_grandchild_persists_under_root() -> io::Result<()> {
1435        // Nesting: a grandchild whose parent is itself a child (parent != root)
1436        // must persist (previously rejected with "no nesting") and load back
1437        // with its real parent lineage. All descendants live flat under the
1438        // tree root's directory.
1439        let (storage, _t) = create_temp_storage().await?;
1440        let root = Session::new("root-1", "m");
1441        storage.save_session(&root).await?;
1442        let child = Session::new_child_of("child-1", &root, "m", "c");
1443        storage.save_session(&child).await?;
1444        let grandchild = Session::new_child_of("gc-1", &child, "m", "g");
1445        storage.save_session(&grandchild).await?;
1446
1447        let loaded = storage.load_session("gc-1").await?.expect("grandchild");
1448        assert_eq!(loaded.parent_session_id.as_deref(), Some("child-1"));
1449        assert_eq!(loaded.root_session_id, "root-1");
1450        assert_eq!(loaded.spawn_depth, 2);
1451
1452        // The grandchild is indexed under the tree root, keyed by its real parent.
1453        let entry = storage.get_index_entry("gc-1").await.expect("indexed");
1454        assert_eq!(entry.parent_session_id.as_deref(), Some("child-1"));
1455        assert_eq!(entry.root_session_id, "root-1");
1456        Ok(())
1457    }
1458
1459    #[tokio::test]
1460    async fn test_list_index_entries_empty() -> io::Result<()> {
1461        let (storage, _temp_dir) = create_temp_storage().await?;
1462        let entries = storage.list_index_entries().await;
1463        assert!(entries.is_empty());
1464        Ok(())
1465    }
1466
1467    #[tokio::test]
1468    async fn test_list_index_entries_with_sessions() -> io::Result<()> {
1469        let (storage, _temp_dir) = create_temp_storage().await?;
1470
1471        let session1 = Session::new("session-1", "model-1");
1472        let session2 = Session::new("session-2", "model-2");
1473
1474        storage.save_session(&session1).await?;
1475        storage.save_session(&session2).await?;
1476
1477        let entries = storage.list_index_entries().await;
1478        assert_eq!(entries.len(), 2);
1479
1480        Ok(())
1481    }
1482
1483    #[tokio::test]
1484    async fn test_get_index_entry() -> io::Result<()> {
1485        let (storage, _temp_dir) = create_temp_storage().await?;
1486        let session = Session::new("session-1", "test-model");
1487
1488        storage.save_session(&session).await?;
1489
1490        let entry = storage.get_index_entry(&session.id).await;
1491        assert!(entry.is_some());
1492        let entry = entry.unwrap();
1493        assert_eq!(entry.id, session.id);
1494
1495        Ok(())
1496    }
1497
1498    #[tokio::test]
1499    async fn test_get_index_entry_returns_none_when_not_found() -> io::Result<()> {
1500        let (storage, _temp_dir) = create_temp_storage().await?;
1501        let entry = storage.get_index_entry("nonexistent").await;
1502        assert!(entry.is_none());
1503        Ok(())
1504    }
1505
1506    #[tokio::test]
1507    async fn test_delete_session() -> io::Result<()> {
1508        let (storage, _temp_dir) = create_temp_storage().await?;
1509        let session = Session::new("session-1", "test-model");
1510
1511        storage.save_session(&session).await?;
1512        assert!(storage.load_session(&session.id).await?.is_some());
1513
1514        let deleted = storage.delete_session(&session.id).await?;
1515        assert!(deleted);
1516        assert!(storage.load_session(&session.id).await?.is_none());
1517
1518        Ok(())
1519    }
1520
1521    #[tokio::test]
1522    async fn test_delete_session_returns_false_when_not_found() -> io::Result<()> {
1523        let (storage, _temp_dir) = create_temp_storage().await?;
1524        let deleted = storage.delete_session("nonexistent").await?;
1525        assert!(!deleted);
1526        Ok(())
1527    }
1528
1529    #[test]
1530    fn test_validate_session_id_empty() {
1531        assert!(validate_session_id("").is_err());
1532    }
1533
1534    #[test]
1535    fn test_validate_session_id_with_slash() {
1536        assert!(validate_session_id("session/1").is_err());
1537    }
1538
1539    #[test]
1540    fn test_validate_session_id_with_backslash() {
1541        assert!(validate_session_id("session\\1").is_err());
1542    }
1543
1544    #[test]
1545    fn test_validate_session_id_with_double_dot() {
1546        assert!(validate_session_id("session..1").is_err());
1547    }
1548
1549    #[test]
1550    fn test_validate_session_id_valid() {
1551        assert!(validate_session_id("session-123").is_ok());
1552    }
1553
1554    #[test]
1555    fn test_root_rel_path() {
1556        let path = SessionStoreV2::root_rel_path("session-123");
1557        assert_eq!(path, "sessions/session-123");
1558    }
1559
1560    #[test]
1561    fn test_child_rel_path() {
1562        let path = SessionStoreV2::child_rel_path("root-1", "child-2");
1563        assert_eq!(path, "sessions/root-1/children/child-2");
1564    }
1565
1566    #[test]
1567    fn test_mime_to_extension() {
1568        assert_eq!(mime_to_extension("image/png"), Some("png"));
1569        assert_eq!(mime_to_extension("image/jpeg"), Some("jpg"));
1570        assert_eq!(mime_to_extension("image/webp"), Some("webp"));
1571        assert_eq!(mime_to_extension("image/gif"), Some("gif"));
1572        assert_eq!(mime_to_extension("image/bmp"), Some("bmp"));
1573        assert_eq!(mime_to_extension("unknown/type"), None);
1574    }
1575
1576    #[test]
1577    fn test_extension_to_mime() {
1578        assert_eq!(extension_to_mime("png"), Some("image/png"));
1579        assert_eq!(extension_to_mime("jpg"), Some("image/jpeg"));
1580        assert_eq!(extension_to_mime("jpeg"), Some("image/jpeg"));
1581        assert_eq!(extension_to_mime("webp"), Some("image/webp"));
1582        assert_eq!(extension_to_mime("gif"), Some("image/gif"));
1583        assert_eq!(extension_to_mime("bmp"), Some("image/bmp"));
1584        assert_eq!(extension_to_mime("unknown"), None);
1585    }
1586
1587    #[test]
1588    fn test_extension_to_mime_case_insensitive() {
1589        assert_eq!(extension_to_mime("PNG"), Some("image/png"));
1590        assert_eq!(extension_to_mime("JPG"), Some("image/jpeg"));
1591        assert_eq!(extension_to_mime("JPEG"), Some("image/jpeg"));
1592    }
1593
1594    #[test]
1595    fn test_extension_to_mime_with_whitespace() {
1596        assert_eq!(extension_to_mime("  png  "), Some("image/png"));
1597        assert_eq!(extension_to_mime("\tjpg\t"), Some("image/jpeg"));
1598    }
1599}