Skip to main content

plexus_substrate/activations/claudecode/
storage.rs

1use super::types::{
2    BufferedEvent, ChatEvent, ClaudeCodeConfig, ClaudeCodeError, ClaudeCodeHandle, ClaudeCodeId,
3    ClaudeCodeInfo, ClaudeMessage, ContentBlock, Message, MessageId, MessageRole, Model,
4    NodeEvent, Position, StreamId, StreamInfo, StreamStatus,
5};
6use crate::activations::arbor::{ArborStorage, NodeId, NodeType, TreeId};
7use crate::activations::storage::init_sqlite_pool;
8use crate::activation_db_path_from_module;
9use serde_json::Value;
10use sqlx::{sqlite::SqlitePool, Row};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18/// Helper to create parse errors
19fn parse_err(context: &'static str, detail: impl std::fmt::Display) -> ClaudeCodeError {
20    ClaudeCodeError::Parse { context, detail: detail.to_string() }
21}
22
23/// Helper to create database errors
24fn db_err(operation: &'static str, source: sqlx::Error) -> ClaudeCodeError {
25    ClaudeCodeError::Database { operation, source }
26}
27
28/// Configuration for ClaudeCode storage
29#[derive(Debug, Clone)]
30pub struct ClaudeCodeStorageConfig {
31    /// Path to SQLite database for ClaudeCode sessions
32    pub db_path: PathBuf,
33}
34
35impl Default for ClaudeCodeStorageConfig {
36    fn default() -> Self {
37        Self {
38            db_path: activation_db_path_from_module!("claudecode.db"),
39        }
40    }
41}
42
43/// In-memory buffer for an active stream
44#[derive(Debug)]
45struct ActiveStreamBuffer {
46    /// Stream metadata
47    info: StreamInfo,
48    /// Buffered events (in-order by seq)
49    events: Vec<BufferedEvent>,
50}
51
52/// Storage layer for ClaudeCode sessions
53pub struct ClaudeCodeStorage {
54    pool: SqlitePool,
55    arbor: Arc<ArborStorage>,
56    /// In-memory buffers for active streams
57    streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>>,
58}
59
60impl ClaudeCodeStorage {
61    /// Create a new ClaudeCode storage instance with a shared Arbor storage
62    pub async fn new(
63        config: ClaudeCodeStorageConfig,
64        arbor: Arc<ArborStorage>,
65    ) -> Result<Self, ClaudeCodeError> {
66        let pool = init_sqlite_pool(config.db_path).await
67            .map_err(|e| db_err("connect", sqlx::Error::Configuration(e.into())))?;
68
69        let storage = Self {
70            pool,
71            arbor,
72            streams: RwLock::new(HashMap::new()),
73        };
74        storage.run_migrations().await?;
75
76        Ok(storage)
77    }
78
79    /// Run database migrations
80    async fn run_migrations(&self) -> Result<(), ClaudeCodeError> {
81        sqlx::query(
82            r#"
83            CREATE TABLE IF NOT EXISTS claudecode_sessions (
84                id TEXT PRIMARY KEY,
85                name TEXT NOT NULL UNIQUE,
86                claude_session_id TEXT,
87                loopback_session_id TEXT,
88                tree_id TEXT NOT NULL,
89                canonical_head TEXT NOT NULL,
90                working_dir TEXT NOT NULL,
91                model TEXT NOT NULL,
92                system_prompt TEXT,
93                mcp_config TEXT,
94                loopback_enabled INTEGER NOT NULL DEFAULT 0,
95                metadata TEXT,
96                created_at INTEGER NOT NULL,
97                updated_at INTEGER NOT NULL
98            );
99
100            CREATE TABLE IF NOT EXISTS claudecode_messages (
101                id TEXT PRIMARY KEY,
102                session_id TEXT NOT NULL,
103                role TEXT NOT NULL,
104                content TEXT NOT NULL,
105                model_id TEXT,
106                input_tokens INTEGER,
107                output_tokens INTEGER,
108                cost_usd REAL,
109                created_at INTEGER NOT NULL,
110                FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
111            );
112
113            CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_name ON claudecode_sessions(name);
114            CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_tree ON claudecode_sessions(tree_id);
115            CREATE INDEX IF NOT EXISTS idx_claudecode_messages_session ON claudecode_messages(session_id);
116
117            CREATE TABLE IF NOT EXISTS claudecode_unknown_events (
118                id TEXT PRIMARY KEY,
119                session_id TEXT,
120                event_type TEXT NOT NULL,
121                data TEXT NOT NULL,
122                created_at INTEGER NOT NULL,
123                FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
124            );
125
126            CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_session ON claudecode_unknown_events(session_id);
127            CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_type ON claudecode_unknown_events(event_type);
128            "#,
129        )
130        .execute(&self.pool)
131        .await
132        .map_err(|e| db_err("run migrations", e))?;
133
134        // Migration: add loopback_session_id if not present
135        let _ = sqlx::query(
136            "ALTER TABLE claudecode_sessions ADD COLUMN loopback_session_id TEXT",
137        )
138        .execute(&self.pool)
139        .await;
140
141        Ok(())
142    }
143
144    /// Get access to the underlying arbor storage
145    pub fn arbor(&self) -> &ArborStorage {
146        &self.arbor
147    }
148
149    // ========================================================================
150    // Session CRUD Operations
151    // ========================================================================
152
153    /// Create a new ClaudeCode session with a new conversation tree
154    pub async fn session_create(
155        &self,
156        name: String,
157        working_dir: String,
158        model: Model,
159        system_prompt: Option<String>,
160        mcp_config: Option<Value>,
161        loopback_enabled: bool,
162        claude_session_id: Option<String>,
163        loopback_session_id: Option<String>,
164        metadata: Option<Value>,
165    ) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
166        let session_id = ClaudeCodeId::new_v4();
167        let now = current_timestamp();
168
169        // Create a new tree for this session
170        let tree_id = self
171            .arbor
172            .tree_create(metadata.clone(), &session_id.to_string())
173            .await
174            .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
175
176        // Get the root node as initial position
177        let tree = self
178            .arbor
179            .tree_get(&tree_id)
180            .await
181            .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
182        let head = Position::new(tree_id, tree.root);
183
184        let metadata_json = metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
185        let mcp_config_json = mcp_config.as_ref().map(|m| serde_json::to_string(m).unwrap());
186
187        // Try inserting with the original name first
188        let final_name = match sqlx::query(
189            "INSERT INTO claudecode_sessions (id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
190             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
191        )
192        .bind(session_id.to_string())
193        .bind(&name)
194        .bind(&claude_session_id)
195        .bind(&loopback_session_id)
196        .bind(head.tree_id.to_string())
197        .bind(head.node_id.to_string())
198        .bind(&working_dir)
199        .bind(model.as_str())
200        .bind(&system_prompt)
201        .bind(mcp_config_json.clone())
202        .bind(loopback_enabled)
203        .bind(metadata_json.clone())
204        .bind(now)
205        .bind(now)
206        .execute(&self.pool)
207        .await
208        {
209            Ok(_) => name,
210            Err(e) if e.to_string().contains("UNIQUE constraint failed") => {
211                // Name collision - append #uuid to make it unique
212                let unique_name = format!("{}#{}", name, session_id);
213
214                sqlx::query(
215                    "INSERT INTO claudecode_sessions (id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
216                     VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
217                )
218                .bind(session_id.to_string())
219                .bind(&unique_name)
220                .bind(&claude_session_id)
221                .bind(&loopback_session_id)
222                .bind(head.tree_id.to_string())
223                .bind(head.node_id.to_string())
224                .bind(&working_dir)
225                .bind(model.as_str())
226                .bind(&system_prompt)
227                .bind(mcp_config_json)
228                .bind(loopback_enabled)
229                .bind(metadata_json)
230                .bind(now)
231                .bind(now)
232                .execute(&self.pool)
233                .await
234                .map_err(|e| db_err("create session (unique name)", e))?;
235
236                unique_name
237            }
238            Err(e) => return Err(db_err("create session", e)),
239        };
240
241        Ok(ClaudeCodeConfig {
242            id: session_id,
243            name: final_name,
244            claude_session_id,
245            loopback_session_id,
246            head,
247            working_dir,
248            model,
249            system_prompt,
250            mcp_config,
251            loopback_enabled,
252            metadata,
253            created_at: now,
254            updated_at: now,
255        })
256    }
257
258    /// Get a session by ID
259    pub async fn session_get(&self, session_id: &ClaudeCodeId) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
260        let row = sqlx::query(
261            "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
262             FROM claudecode_sessions WHERE id = ?",
263        )
264        .bind(session_id.to_string())
265        .fetch_optional(&self.pool)
266        .await
267        .map_err(|e| db_err("fetch session", e))?
268        .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() })?;
269
270        self.row_to_config(row)
271    }
272
273    /// Get a session by name (supports partial matching)
274    pub async fn session_get_by_name(&self, name: &str) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
275        // Try exact match first
276        if let Some(row) = sqlx::query(
277            "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
278             FROM claudecode_sessions WHERE name = ?",
279        )
280        .bind(name)
281        .fetch_optional(&self.pool)
282        .await
283        .map_err(|e| db_err("fetch session by name", e))?
284        {
285            return self.row_to_config(row);
286        }
287
288        // Try partial match
289        let pattern = format!("{}%", name);
290        let rows = sqlx::query(
291            "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
292             FROM claudecode_sessions WHERE name LIKE ?",
293        )
294        .bind(&pattern)
295        .fetch_all(&self.pool)
296        .await
297        .map_err(|e| db_err("fetch session by pattern", e))?;
298
299        match rows.len() {
300            0 => Err(ClaudeCodeError::SessionNotFound { identifier: name.to_string() }),
301            1 => self.row_to_config(rows.into_iter().next().unwrap()),
302            _ => {
303                let matches: Vec<String> = rows.iter().map(|r| r.get("name")).collect();
304                Err(ClaudeCodeError::AmbiguousSession {
305                    name: name.to_string(),
306                    matches: matches.join(", "),
307                })
308            }
309        }
310    }
311
312    /// List all sessions
313    pub async fn session_list(&self) -> Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> {
314        let rows = sqlx::query(
315            "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, loopback_enabled, created_at
316             FROM claudecode_sessions ORDER BY created_at DESC",
317        )
318        .fetch_all(&self.pool)
319        .await
320        .map_err(|e| db_err("list sessions", e))?;
321
322        let sessions: Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> = rows
323            .iter()
324            .map(|row| {
325                let id_str: String = row.get("id");
326                let tree_id_str: String = row.get("tree_id");
327                let head_str: String = row.get("canonical_head");
328                let model_str: String = row.get("model");
329                let loopback: i32 = row.get("loopback_enabled");
330
331                let tree_id = TreeId::parse_str(&tree_id_str)
332                    .map_err(|e| parse_err("tree ID", e))?;
333                let node_id = NodeId::parse_str(&head_str)
334                    .map_err(|e| parse_err("node ID", e))?;
335                let model = Model::from_str(&model_str)
336                    .ok_or_else(|| parse_err("model", &model_str))?;
337
338                Ok(ClaudeCodeInfo {
339                    id: Uuid::parse_str(&id_str).map_err(|e| parse_err("session ID", e))?,
340                    name: row.get("name"),
341                    model,
342                    head: Position::new(tree_id, node_id),
343                    claude_session_id: row.get("claude_session_id"),
344                    working_dir: row.get("working_dir"),
345                    loopback_enabled: loopback != 0,
346                    created_at: row.get("created_at"),
347                })
348            })
349            .collect();
350
351        sessions
352    }
353
354    /// Update session's canonical head and optionally the Claude session ID
355    pub async fn session_update_head(
356        &self,
357        session_id: &ClaudeCodeId,
358        new_head: NodeId,
359        claude_session_id: Option<String>,
360    ) -> Result<(), ClaudeCodeError> {
361        let now = current_timestamp();
362
363        let result = if let Some(claude_id) = claude_session_id {
364            sqlx::query(
365                "UPDATE claudecode_sessions SET canonical_head = ?, claude_session_id = ?, updated_at = ? WHERE id = ?",
366            )
367            .bind(new_head.to_string())
368            .bind(claude_id)
369            .bind(now)
370            .bind(session_id.to_string())
371            .execute(&self.pool)
372            .await
373        } else {
374            sqlx::query(
375                "UPDATE claudecode_sessions SET canonical_head = ?, updated_at = ? WHERE id = ?",
376            )
377            .bind(new_head.to_string())
378            .bind(now)
379            .bind(session_id.to_string())
380            .execute(&self.pool)
381            .await
382        }
383        .map_err(|e| db_err("update session head", e))?;
384
385        if result.rows_affected() == 0 {
386            return Err(ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() });
387        }
388
389        Ok(())
390    }
391
392    /// Update the loopback_session_id after session creation
393    pub async fn session_update_loopback_id(
394        &self,
395        session_id: &ClaudeCodeId,
396        loopback_session_id: String,
397    ) -> Result<(), ClaudeCodeError> {
398        let now = current_timestamp();
399        sqlx::query(
400            "UPDATE claudecode_sessions SET loopback_session_id = ?, updated_at = ? WHERE id = ?",
401        )
402        .bind(&loopback_session_id)
403        .bind(now)
404        .bind(session_id.to_string())
405        .execute(&self.pool)
406        .await
407        .map_err(|e| db_err("update loopback_session_id", e))?;
408        Ok(())
409    }
410
411    /// Update the claude_session_id (real Claude UUID) after first successful chat
412    pub async fn session_update_claude_id(
413        &self,
414        session_id: &ClaudeCodeId,
415        claude_session_id: String,
416    ) -> Result<(), ClaudeCodeError> {
417        let now = current_timestamp();
418        sqlx::query(
419            "UPDATE claudecode_sessions SET claude_session_id = ?, updated_at = ? WHERE id = ?",
420        )
421        .bind(&claude_session_id)
422        .bind(now)
423        .bind(session_id.to_string())
424        .execute(&self.pool)
425        .await
426        .map_err(|e| db_err("update claude_session_id", e))?;
427        Ok(())
428    }
429
430    /// Update session configuration
431    pub async fn session_update(
432        &self,
433        session_id: &ClaudeCodeId,
434        name: Option<String>,
435        model: Option<Model>,
436        system_prompt: Option<Option<String>>,
437        mcp_config: Option<Value>,
438        metadata: Option<Value>,
439    ) -> Result<(), ClaudeCodeError> {
440        let now = current_timestamp();
441        let current = self.session_get(session_id).await?;
442
443        let new_name = name.unwrap_or(current.name);
444        let new_model = model.unwrap_or(current.model);
445        let new_prompt = system_prompt.unwrap_or(current.system_prompt);
446        let new_mcp = mcp_config.or(current.mcp_config);
447        let new_metadata = metadata.or(current.metadata);
448
449        let mcp_json = new_mcp.as_ref().map(|m| serde_json::to_string(m).unwrap());
450        let metadata_json = new_metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
451
452        sqlx::query(
453            "UPDATE claudecode_sessions SET name = ?, model = ?, system_prompt = ?, mcp_config = ?, metadata = ?, updated_at = ? WHERE id = ?",
454        )
455        .bind(&new_name)
456        .bind(new_model.as_str())
457        .bind(&new_prompt)
458        .bind(mcp_json)
459        .bind(metadata_json)
460        .bind(now)
461        .bind(session_id.to_string())
462        .execute(&self.pool)
463        .await
464        .map_err(|e| db_err("update session", e))?;
465
466        Ok(())
467    }
468
469    /// Delete a session (does not delete the arbor tree)
470    pub async fn session_delete(&self, session_id: &ClaudeCodeId) -> Result<(), ClaudeCodeError> {
471        let result = sqlx::query("DELETE FROM claudecode_sessions WHERE id = ?")
472            .bind(session_id.to_string())
473            .execute(&self.pool)
474            .await
475            .map_err(|e| db_err("delete session", e))?;
476
477        if result.rows_affected() == 0 {
478            return Err(ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() });
479        }
480
481        Ok(())
482    }
483
484    // ========================================================================
485    // Message Operations
486    // ========================================================================
487
488    /// Create a message and return it
489    pub async fn message_create(
490        &self,
491        session_id: &ClaudeCodeId,
492        role: MessageRole,
493        content: String,
494        model_id: Option<String>,
495        input_tokens: Option<i64>,
496        output_tokens: Option<i64>,
497        cost_usd: Option<f64>,
498    ) -> Result<Message, ClaudeCodeError> {
499        let message_id = MessageId::new_v4();
500        let now = current_timestamp();
501
502        sqlx::query(
503            "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
504             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
505        )
506        .bind(message_id.to_string())
507        .bind(session_id.to_string())
508        .bind(role.as_str())
509        .bind(&content)
510        .bind(&model_id)
511        .bind(input_tokens)
512        .bind(output_tokens)
513        .bind(cost_usd)
514        .bind(now)
515        .execute(&self.pool)
516        .await
517        .map_err(|e| db_err("create message", e))?;
518
519        Ok(Message {
520            id: message_id,
521            session_id: *session_id,
522            role,
523            content,
524            created_at: now,
525            model_id,
526            input_tokens,
527            output_tokens,
528            cost_usd,
529        })
530    }
531
532    /// Create an ephemeral message (marked for deletion) and return it
533    pub async fn message_create_ephemeral(
534        &self,
535        session_id: &ClaudeCodeId,
536        role: MessageRole,
537        content: String,
538        model_id: Option<String>,
539        input_tokens: Option<i64>,
540        output_tokens: Option<i64>,
541        cost_usd: Option<f64>,
542    ) -> Result<Message, ClaudeCodeError> {
543        let message_id = MessageId::new_v4();
544        let now = current_timestamp();
545
546        // Insert with a special marker in metadata or a separate flag
547        // For now, we'll use a negative timestamp as a deletion marker
548        // Messages with negative created_at are ephemeral and should be cleaned up
549        let ephemeral_marker = -now;
550
551        sqlx::query(
552            "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
553             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
554        )
555        .bind(message_id.to_string())
556        .bind(session_id.to_string())
557        .bind(role.as_str())
558        .bind(&content)
559        .bind(&model_id)
560        .bind(input_tokens)
561        .bind(output_tokens)
562        .bind(cost_usd)
563        .bind(ephemeral_marker)
564        .execute(&self.pool)
565        .await
566        .map_err(|e| db_err("create ephemeral message", e))?;
567
568        Ok(Message {
569            id: message_id,
570            session_id: *session_id,
571            role,
572            content,
573            created_at: ephemeral_marker,
574            model_id,
575            input_tokens,
576            output_tokens,
577            cost_usd,
578        })
579    }
580
581    /// Get a message by ID
582    pub async fn message_get(&self, message_id: &MessageId) -> Result<Message, ClaudeCodeError> {
583        let row = sqlx::query(
584            "SELECT id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at
585             FROM claudecode_messages WHERE id = ?",
586        )
587        .bind(message_id.to_string())
588        .fetch_optional(&self.pool)
589        .await
590        .map_err(|e| db_err("fetch message", e))?
591        .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("message:{}", message_id) })?;
592
593        self.row_to_message(row)
594    }
595
596    /// Resolve a message handle identifier to a Message
597    /// Handle format: "msg-{message_id}:{role}:{name}"
598    pub async fn resolve_message_handle(&self, identifier: &str) -> Result<Message, ClaudeCodeError> {
599        let parts: Vec<&str> = identifier.splitn(3, ':').collect();
600        if parts.len() < 2 {
601            return Err(parse_err("message handle", format!("invalid format: {}", identifier)));
602        }
603
604        let msg_part = parts[0];
605        if !msg_part.starts_with("msg-") {
606            return Err(parse_err("message handle", format!("invalid prefix: {}", identifier)));
607        }
608
609        let message_id_str = &msg_part[4..];
610        let message_id = Uuid::parse_str(message_id_str)
611            .map_err(|e| parse_err("message ID in handle", e))?;
612
613        self.message_get(&message_id).await
614    }
615
616    /// Create a handle for a message
617    ///
618    /// Format: `{plugin_id}@1.0.0::chat:msg-{id}:{role}:{name}`
619    /// Uses ClaudeCodeHandle enum for type-safe handle creation.
620    pub fn message_to_handle(message: &Message, name: &str) -> crate::types::Handle {
621        ClaudeCodeHandle::Message {
622            message_id: format!("msg-{}", message.id),
623            role: message.role.as_str().to_string(),
624            name: name.to_string(),
625        }.to_handle()
626    }
627
628    // ========================================================================
629    // Unknown Event Operations
630    // ========================================================================
631
632    /// Store an unknown event and return its ID (handle)
633    pub async fn unknown_event_store(
634        &self,
635        session_id: Option<&ClaudeCodeId>,
636        event_type: &str,
637        data: &Value,
638    ) -> Result<String, ClaudeCodeError> {
639        let id = Uuid::new_v4().to_string();
640        let now = current_timestamp();
641        let data_json = serde_json::to_string(data)?;
642
643        sqlx::query(
644            "INSERT INTO claudecode_unknown_events (id, session_id, event_type, data, created_at)
645             VALUES (?, ?, ?, ?, ?)",
646        )
647        .bind(&id)
648        .bind(session_id.map(|s| s.to_string()))
649        .bind(event_type)
650        .bind(&data_json)
651        .bind(now)
652        .execute(&self.pool)
653        .await
654        .map_err(|e| db_err("store unknown event", e))?;
655
656        Ok(id)
657    }
658
659    /// Retrieve an unknown event by ID
660    pub async fn unknown_event_get(&self, id: &str) -> Result<(String, Value), ClaudeCodeError> {
661        let row = sqlx::query(
662            "SELECT event_type, data FROM claudecode_unknown_events WHERE id = ?",
663        )
664        .bind(id)
665        .fetch_optional(&self.pool)
666        .await
667        .map_err(|e| db_err("fetch unknown event", e))?
668        .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("unknown_event:{}", id) })?;
669
670        let event_type: String = row.get("event_type");
671        let data_json: String = row.get("data");
672        let data: Value = serde_json::from_str(&data_json)?;
673
674        Ok((event_type, data))
675    }
676
677    /// List unknown events by type (for analysis/debugging)
678    pub async fn unknown_events_by_type(&self, event_type: &str) -> Result<Vec<(String, Value)>, ClaudeCodeError> {
679        let rows = sqlx::query(
680            "SELECT id, data FROM claudecode_unknown_events WHERE event_type = ? ORDER BY created_at DESC",
681        )
682        .bind(event_type)
683        .fetch_all(&self.pool)
684        .await
685        .map_err(|e| db_err("list unknown events", e))?;
686
687        rows.iter()
688            .map(|row| {
689                let id: String = row.get("id");
690                let data_json: String = row.get("data");
691                let data: Value = serde_json::from_str(&data_json)?;
692                Ok((id, data))
693            })
694            .collect()
695    }
696
697    // ========================================================================
698    // Stream Management (in-memory buffer for async chat)
699    // ========================================================================
700
701    /// Create a new stream buffer for async chat
702    pub async fn stream_create(
703        &self,
704        session_id: ClaudeCodeId,
705    ) -> Result<StreamId, ClaudeCodeError> {
706        let stream_id = StreamId::new_v4();
707        let now = current_timestamp();
708
709        let info = StreamInfo {
710            stream_id,
711            session_id,
712            status: StreamStatus::Running,
713            user_position: None,
714            event_count: 0,
715            read_position: 0,
716            started_at: now,
717            ended_at: None,
718            error: None,
719        };
720
721        let buffer = ActiveStreamBuffer {
722            info,
723            events: Vec::new(),
724        };
725
726        let mut streams = self.streams.write().await;
727        streams.insert(stream_id, buffer);
728
729        Ok(stream_id)
730    }
731
732    /// Set the user position for a stream (called after user message is created)
733    pub async fn stream_set_user_position(
734        &self,
735        stream_id: &StreamId,
736        position: Position,
737    ) -> Result<(), ClaudeCodeError> {
738        let mut streams = self.streams.write().await;
739        let buffer = streams.get_mut(stream_id)
740            .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
741        buffer.info.user_position = Some(position);
742        Ok(())
743    }
744
745    /// Push an event to a stream buffer
746    pub async fn stream_push_event(
747        &self,
748        stream_id: &StreamId,
749        event: ChatEvent,
750    ) -> Result<u64, ClaudeCodeError> {
751        let now = current_timestamp();
752        let mut streams = self.streams.write().await;
753        let buffer = streams.get_mut(stream_id)
754            .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
755
756        let seq = buffer.info.event_count;
757        buffer.events.push(BufferedEvent {
758            seq,
759            event,
760            timestamp: now,
761        });
762        buffer.info.event_count += 1;
763
764        Ok(seq)
765    }
766
767    /// Update stream status
768    pub async fn stream_set_status(
769        &self,
770        stream_id: &StreamId,
771        status: StreamStatus,
772        error: Option<String>,
773    ) -> Result<(), ClaudeCodeError> {
774        let now = current_timestamp();
775        let mut streams = self.streams.write().await;
776        let buffer = streams.get_mut(stream_id)
777            .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
778
779        buffer.info.status = status;
780        if status == StreamStatus::Complete || status == StreamStatus::Failed {
781            buffer.info.ended_at = Some(now);
782        }
783        if let Some(e) = error {
784            buffer.info.error = Some(e);
785        }
786
787        Ok(())
788    }
789
790    /// Get stream info
791    pub async fn stream_get_info(&self, stream_id: &StreamId) -> Result<StreamInfo, ClaudeCodeError> {
792        let streams = self.streams.read().await;
793        let buffer = streams.get(stream_id)
794            .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
795        Ok(buffer.info.clone())
796    }
797
798    /// Poll events from a stream
799    /// Returns events starting from `from_seq` up to `limit` events
800    pub async fn stream_poll(
801        &self,
802        stream_id: &StreamId,
803        from_seq: Option<u64>,
804        limit: Option<usize>,
805    ) -> Result<(StreamInfo, Vec<BufferedEvent>), ClaudeCodeError> {
806        let mut streams = self.streams.write().await;
807        let buffer = streams.get_mut(stream_id)
808            .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
809
810        let start = from_seq.unwrap_or(buffer.info.read_position) as usize;
811        let max_events = limit.unwrap_or(100);
812
813        let events: Vec<BufferedEvent> = buffer.events
814            .iter()
815            .skip(start)
816            .take(max_events)
817            .cloned()
818            .collect();
819
820        // Update read position to the end of what we returned
821        if !events.is_empty() {
822            let last_seq = events.last().unwrap().seq;
823            buffer.info.read_position = last_seq + 1;
824        }
825
826        Ok((buffer.info.clone(), events))
827    }
828
829    /// List all active streams
830    pub async fn stream_list(&self) -> Vec<StreamInfo> {
831        let streams = self.streams.read().await;
832        streams.values().map(|b| b.info.clone()).collect()
833    }
834
835    /// List active streams for a session
836    pub async fn stream_list_for_session(&self, session_id: &ClaudeCodeId) -> Vec<StreamInfo> {
837        let streams = self.streams.read().await;
838        streams
839            .values()
840            .filter(|b| &b.info.session_id == session_id)
841            .map(|b| b.info.clone())
842            .collect()
843    }
844
845    /// Remove a completed/failed stream from memory
846    /// Returns the final stream info if found
847    pub async fn stream_cleanup(&self, stream_id: &StreamId) -> Option<StreamInfo> {
848        let mut streams = self.streams.write().await;
849        streams.remove(stream_id).map(|b| b.info)
850    }
851
852    /// Check if a stream exists
853    pub async fn stream_exists(&self, stream_id: &StreamId) -> bool {
854        let streams = self.streams.read().await;
855        streams.contains_key(stream_id)
856    }
857
858    // ========================================================================
859    // Arbor Rendering (Milestone 3)
860    // ========================================================================
861
862    /// Render arbor tree path into Claude API messages format
863    ///
864    /// Walks from start to end node, parsing NodeEvent JSON from each node,
865    /// and groups into Claude messages array.
866    ///
867    /// # Algorithm
868    /// 1. Get path from start to end via arbor
869    /// 2. Parse each node's content as NodeEvent
870    /// 3. Group into messages based on event type
871    /// 4. Return messages array
872    pub async fn render_messages(
873        &self,
874        tree_id: &TreeId,
875        start: &NodeId,
876        end: &NodeId,
877    ) -> Result<Vec<ClaudeMessage>, ClaudeCodeError> {
878        // 1. Get path from root to end (returns Vec<NodeId>)
879        let node_ids = self
880            .arbor
881            .node_get_path(tree_id, end)
882            .await
883            .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
884
885        // Find the starting node in the path
886        let start_idx = node_ids
887            .iter()
888            .position(|id| id == start)
889            .ok_or_else(|| ClaudeCodeError::Arbor("start node not found in path from root to end".to_string()))?;
890
891        let node_ids = &node_ids[start_idx..];
892
893        // 2. Get full node data for each node ID and group into messages
894        let mut messages: Vec<ClaudeMessage> = Vec::new();
895        let mut current_role: Option<String> = None;
896        let mut current_content: Vec<ContentBlock> = Vec::new();
897
898        for node_id in node_ids {
899            // Get the full node data
900            let node = self
901                .arbor
902                .node_get(tree_id, node_id)
903                .await
904                .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
905
906            // Extract content from node data
907            let content = match &node.data {
908                NodeType::Text { content } => content,
909                NodeType::External { .. } => {
910                    // Skip external nodes for now
911                    continue;
912                }
913            };
914
915            // Skip empty nodes (like root)
916            if content.trim().is_empty() {
917                continue;
918            }
919
920            // Parse node content as NodeEvent
921            let event: NodeEvent = serde_json::from_str(content)?;
922
923            match event {
924                NodeEvent::UserMessage { content } => {
925                    // Flush previous message if any
926                    if let Some(role) = current_role.take() {
927                        if !current_content.is_empty() {
928                            messages.push(ClaudeMessage {
929                                role,
930                                content: current_content.clone(),
931                            });
932                            current_content.clear();
933                        }
934                    }
935
936                    // Start new user message
937                    current_role = Some("user".to_string());
938                    current_content.push(ContentBlock::Text { text: content });
939                }
940
941                NodeEvent::AssistantStart => {
942                    // Flush previous message if any
943                    if let Some(role) = current_role.take() {
944                        if !current_content.is_empty() {
945                            messages.push(ClaudeMessage {
946                                role,
947                                content: current_content.clone(),
948                            });
949                            current_content.clear();
950                        }
951                    }
952
953                    // Start new assistant message
954                    current_role = Some("assistant".to_string());
955                }
956
957                NodeEvent::ContentText { text } => {
958                    // Add text content to current message
959                    current_content.push(ContentBlock::Text { text });
960                }
961
962                NodeEvent::ContentToolUse { id, name, input } => {
963                    // Add tool use to current message
964                    current_content.push(ContentBlock::ToolUse { id, name, input });
965                }
966
967                NodeEvent::ContentThinking { thinking } => {
968                    // Add thinking block to current message
969                    current_content.push(ContentBlock::Thinking { thinking });
970                }
971
972                NodeEvent::UserToolResult {
973                    tool_use_id,
974                    content,
975                    is_error,
976                } => {
977                    // Flush current assistant message if any
978                    if let Some(role) = current_role.take() {
979                        if !current_content.is_empty() {
980                            messages.push(ClaudeMessage {
981                                role,
982                                content: current_content.clone(),
983                            });
984                            current_content.clear();
985                        }
986                    }
987
988                    // Tool results become separate user messages
989                    messages.push(ClaudeMessage {
990                        role: "user".to_string(),
991                        content: vec![ContentBlock::ToolResult {
992                            tool_use_id,
993                            content,
994                            is_error,
995                        }],
996                    });
997                }
998
999                NodeEvent::AssistantComplete { usage: _ } => {
1000                    // Flush current message if any
1001                    if let Some(role) = current_role.take() {
1002                        if !current_content.is_empty() {
1003                            messages.push(ClaudeMessage {
1004                                role,
1005                                content: current_content.clone(),
1006                            });
1007                            current_content.clear();
1008                        }
1009                    }
1010                }
1011
1012                // Debug/observability nodes — not part of conversation history
1013                NodeEvent::LaunchCommand { .. } | NodeEvent::ClaudeStderr { .. } => {}
1014            }
1015        }
1016
1017        // Flush any remaining message
1018        if let Some(role) = current_role {
1019            if !current_content.is_empty() {
1020                messages.push(ClaudeMessage {
1021                    role,
1022                    content: current_content,
1023                });
1024            }
1025        }
1026
1027        Ok(messages)
1028    }
1029
1030    // ========================================================================
1031    // Helper methods
1032    // ========================================================================
1033
1034    fn row_to_message(&self, row: sqlx::sqlite::SqliteRow) -> Result<Message, ClaudeCodeError> {
1035        let id_str: String = row.get("id");
1036        let session_id_str: String = row.get("session_id");
1037        let role_str: String = row.get("role");
1038
1039        Ok(Message {
1040            id: Uuid::parse_str(&id_str).map_err(|e| parse_err("message ID", e))?,
1041            session_id: Uuid::parse_str(&session_id_str)
1042                .map_err(|e| parse_err("session ID", e))?,
1043            role: MessageRole::from_str(&role_str)
1044                .ok_or_else(|| parse_err("role", &role_str))?,
1045            content: row.get("content"),
1046            created_at: row.get("created_at"),
1047            model_id: row.get("model_id"),
1048            input_tokens: row.get("input_tokens"),
1049            output_tokens: row.get("output_tokens"),
1050            cost_usd: row.get("cost_usd"),
1051        })
1052    }
1053
1054    fn row_to_config(&self, row: sqlx::sqlite::SqliteRow) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
1055        let id_str: String = row.get("id");
1056        let tree_id_str: String = row.get("tree_id");
1057        let head_str: String = row.get("canonical_head");
1058        let model_str: String = row.get("model");
1059        let metadata_json: Option<String> = row.get("metadata");
1060        let mcp_config_json: Option<String> = row.get("mcp_config");
1061        let loopback: i32 = row.get("loopback_enabled");
1062
1063        let tree_id = TreeId::parse_str(&tree_id_str)
1064            .map_err(|e| parse_err("tree ID", e))?;
1065        let node_id = NodeId::parse_str(&head_str)
1066            .map_err(|e| parse_err("node ID", e))?;
1067        let model = Model::from_str(&model_str)
1068            .ok_or_else(|| parse_err("model", &model_str))?;
1069
1070        Ok(ClaudeCodeConfig {
1071            id: Uuid::parse_str(&id_str).map_err(|e| parse_err("session ID", e))?,
1072            name: row.get("name"),
1073            claude_session_id: row.get("claude_session_id"),
1074            loopback_session_id: row.try_get("loopback_session_id").ok().flatten(),
1075            head: Position::new(tree_id, node_id),
1076            working_dir: row.get("working_dir"),
1077            model,
1078            system_prompt: row.get("system_prompt"),
1079            mcp_config: mcp_config_json.and_then(|s| serde_json::from_str(&s).ok()),
1080            loopback_enabled: loopback != 0,
1081            metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
1082            created_at: row.get("created_at"),
1083            updated_at: row.get("updated_at"),
1084        })
1085    }
1086}
1087
1088/// Get current Unix timestamp in seconds
1089fn current_timestamp() -> i64 {
1090    SystemTime::now()
1091        .duration_since(UNIX_EPOCH)
1092        .unwrap()
1093        .as_secs() as i64
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099
1100    /// Test stream buffer in-memory operations (no database needed)
1101    #[tokio::test]
1102    async fn test_stream_buffer_operations() {
1103        // Create a minimal storage with just the streams buffer
1104        let streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>> = RwLock::new(HashMap::new());
1105
1106        // Create a stream
1107        let stream_id = StreamId::new_v4();
1108        let session_id = ClaudeCodeId::new_v4();
1109        let now = current_timestamp();
1110
1111        let info = StreamInfo {
1112            stream_id,
1113            session_id,
1114            status: StreamStatus::Running,
1115            user_position: None,
1116            event_count: 0,
1117            read_position: 0,
1118            started_at: now,
1119            ended_at: None,
1120            error: None,
1121        };
1122
1123        let buffer = ActiveStreamBuffer {
1124            info,
1125            events: Vec::new(),
1126        };
1127
1128        streams.write().await.insert(stream_id, buffer);
1129
1130        // Push some events
1131        {
1132            let mut streams = streams.write().await;
1133            let buffer = streams.get_mut(&stream_id).unwrap();
1134
1135            buffer.events.push(BufferedEvent {
1136                seq: 0,
1137                event: ChatEvent::Start {
1138                    id: session_id,
1139                    user_position: Position::new(TreeId::new(), NodeId::new()),
1140                },
1141                timestamp: now,
1142            });
1143            buffer.info.event_count = 1;
1144
1145            buffer.events.push(BufferedEvent {
1146                seq: 1,
1147                event: ChatEvent::Content { text: "Hello".to_string() },
1148                timestamp: now,
1149            });
1150            buffer.info.event_count = 2;
1151        }
1152
1153        // Poll events
1154        {
1155            let mut streams = streams.write().await;
1156            let buffer = streams.get_mut(&stream_id).unwrap();
1157
1158            let events: Vec<_> = buffer.events.iter().skip(0).take(10).cloned().collect();
1159            assert_eq!(events.len(), 2);
1160            assert_eq!(events[0].seq, 0);
1161            assert_eq!(events[1].seq, 1);
1162
1163            // Update read position
1164            buffer.info.read_position = 2;
1165        }
1166
1167        // Poll again - should get nothing new
1168        {
1169            let streams = streams.read().await;
1170            let buffer = streams.get(&stream_id).unwrap();
1171
1172            let events: Vec<_> = buffer.events.iter()
1173                .skip(buffer.info.read_position as usize)
1174                .take(10)
1175                .collect();
1176            assert_eq!(events.len(), 0);
1177        }
1178
1179        // Add more events
1180        {
1181            let mut streams = streams.write().await;
1182            let buffer = streams.get_mut(&stream_id).unwrap();
1183
1184            buffer.events.push(BufferedEvent {
1185                seq: 2,
1186                event: ChatEvent::Content { text: " World".to_string() },
1187                timestamp: now,
1188            });
1189            buffer.info.event_count = 3;
1190        }
1191
1192        // Poll again - should get the new event
1193        {
1194            let mut streams = streams.write().await;
1195            let buffer = streams.get_mut(&stream_id).unwrap();
1196
1197            let events: Vec<_> = buffer.events.iter()
1198                .skip(buffer.info.read_position as usize)
1199                .take(10)
1200                .cloned()
1201                .collect();
1202            assert_eq!(events.len(), 1);
1203            assert_eq!(events[0].seq, 2);
1204
1205            // Update read position
1206            buffer.info.read_position = 3;
1207        }
1208
1209        // Test status transitions
1210        {
1211            let mut streams = streams.write().await;
1212            let buffer = streams.get_mut(&stream_id).unwrap();
1213
1214            assert_eq!(buffer.info.status, StreamStatus::Running);
1215
1216            buffer.info.status = StreamStatus::AwaitingPermission;
1217            assert_eq!(buffer.info.status, StreamStatus::AwaitingPermission);
1218
1219            buffer.info.status = StreamStatus::Complete;
1220            buffer.info.ended_at = Some(current_timestamp());
1221            assert_eq!(buffer.info.status, StreamStatus::Complete);
1222            assert!(buffer.info.ended_at.is_some());
1223        }
1224    }
1225
1226    #[test]
1227    fn test_stream_status_serialization() {
1228        // Test that StreamStatus serializes correctly for MCP
1229        let status = StreamStatus::AwaitingPermission;
1230        let json = serde_json::to_string(&status).unwrap();
1231        assert_eq!(json, "\"awaiting_permission\"");
1232
1233        let status = StreamStatus::Running;
1234        let json = serde_json::to_string(&status).unwrap();
1235        assert_eq!(json, "\"running\"");
1236    }
1237}
1238
1239// ============================================================================
1240// Milestone 3 Tests: render_messages()
1241// ============================================================================
1242
1243#[cfg(test)]
1244mod render_tests {
1245    use super::*;
1246    use crate::activations::arbor::{ArborConfig, ArborError};
1247
1248    /// Helper to create test storage with arbor
1249    async fn create_test_storage() -> (ClaudeCodeStorage, PathBuf) {
1250        let temp_dir = std::env::temp_dir();
1251        let test_id = Uuid::new_v4();
1252        let arbor_path = temp_dir.join(format!("test_arbor_{}.db", test_id));
1253        let claudecode_path = temp_dir.join(format!("test_claudecode_{}.db", test_id));
1254
1255        let arbor_config = ArborConfig {
1256            db_path: arbor_path.clone(),
1257            scheduled_deletion_window: 604800,
1258            archive_window: 2592000,
1259            auto_cleanup: false, // Disable for tests
1260            cleanup_interval: 3600,
1261        };
1262        let arbor = Arc::new(ArborStorage::new(arbor_config).await.unwrap());
1263
1264        let claudecode_config = ClaudeCodeStorageConfig {
1265            db_path: claudecode_path.clone(),
1266        };
1267        let storage = ClaudeCodeStorage::new(claudecode_config, arbor)
1268            .await
1269            .unwrap();
1270
1271        (storage, arbor_path)
1272    }
1273
1274    /// Helper to create a node with NodeEvent content
1275    async fn create_event_node(
1276        arbor: &ArborStorage,
1277        tree_id: &TreeId,
1278        parent_id: &NodeId,
1279        event: &NodeEvent,
1280    ) -> Result<NodeId, ArborError> {
1281        let content = serde_json::to_string(event).map_err(|e| e.to_string())?;
1282        arbor.node_create_text(tree_id, Some(*parent_id), content, None).await
1283    }
1284
1285    #[tokio::test]
1286    async fn test_render_simple_exchange() {
1287        let (storage, _temp_path) = create_test_storage().await;
1288        let arbor = storage.arbor();
1289
1290        // Create test arbor tree
1291        let tree_id = arbor.tree_create(None, "test-session").await.unwrap();
1292        let tree = arbor.tree_get(&tree_id).await.unwrap();
1293        let root = tree.root;
1294
1295        // Build tree: root -> user -> assistant_start -> content -> assistant_complete
1296        let user_node = create_event_node(
1297            arbor,
1298            &tree_id,
1299            &root,
1300            &NodeEvent::UserMessage {
1301                content: "Hello".to_string(),
1302            },
1303        )
1304        .await
1305        .unwrap();
1306
1307        let assistant_start = create_event_node(
1308            arbor,
1309            &tree_id,
1310            &user_node,
1311            &NodeEvent::AssistantStart,
1312        )
1313        .await
1314        .unwrap();
1315
1316        let content_node = create_event_node(
1317            arbor,
1318            &tree_id,
1319            &assistant_start,
1320            &NodeEvent::ContentText {
1321                text: "Hi there!".to_string(),
1322            },
1323        )
1324        .await
1325        .unwrap();
1326
1327        let complete_node = create_event_node(
1328            arbor,
1329            &tree_id,
1330            &content_node,
1331            &NodeEvent::AssistantComplete { usage: None },
1332        )
1333        .await
1334        .unwrap();
1335
1336        // Render from root to end
1337        let messages = storage
1338            .render_messages(&tree_id, &root, &complete_node)
1339            .await
1340            .unwrap();
1341
1342        // Verify: 2 messages (user + assistant)
1343        assert_eq!(messages.len(), 2);
1344
1345        // Verify user message
1346        assert_eq!(messages[0].role, "user");
1347        assert_eq!(messages[0].content.len(), 1);
1348        if let ContentBlock::Text { text } = &messages[0].content[0] {
1349            assert_eq!(text, "Hello");
1350        } else {
1351            panic!("Expected text content block");
1352        }
1353
1354        // Verify assistant message
1355        assert_eq!(messages[1].role, "assistant");
1356        assert_eq!(messages[1].content.len(), 1);
1357        if let ContentBlock::Text { text } = &messages[1].content[0] {
1358            assert_eq!(text, "Hi there!");
1359        } else {
1360            panic!("Expected text content block");
1361        }
1362    }
1363
1364    #[tokio::test]
1365    async fn test_render_with_tool_use() {
1366        let (storage, _temp_path) = create_test_storage().await;
1367        let arbor = storage.arbor();
1368
1369        // Create test arbor tree
1370        let tree_id = arbor.tree_create(None, "test-tool-session").await.unwrap();
1371        let tree = arbor.tree_get(&tree_id).await.unwrap();
1372        let root = tree.root;
1373
1374        // Build tree with tool use:
1375        // root -> user -> assistant_start -> content -> tool_use -> assistant_complete
1376        //              -> user_tool_result -> assistant_start -> content -> assistant_complete
1377        let user_node = create_event_node(
1378            arbor,
1379            &tree_id,
1380            &root,
1381            &NodeEvent::UserMessage {
1382                content: "Write a file".to_string(),
1383            },
1384        )
1385        .await
1386        .unwrap();
1387
1388        let assistant_start = create_event_node(
1389            arbor,
1390            &tree_id,
1391            &user_node,
1392            &NodeEvent::AssistantStart,
1393        )
1394        .await
1395        .unwrap();
1396
1397        let text_node = create_event_node(
1398            arbor,
1399            &tree_id,
1400            &assistant_start,
1401            &NodeEvent::ContentText {
1402                text: "I'll write that file.".to_string(),
1403            },
1404        )
1405        .await
1406        .unwrap();
1407
1408        let tool_use_node = create_event_node(
1409            arbor,
1410            &tree_id,
1411            &text_node,
1412            &NodeEvent::ContentToolUse {
1413                id: "tool_123".to_string(),
1414                name: "write_file".to_string(),
1415                input: serde_json::json!({"path": "test.txt", "content": "hello"}),
1416            },
1417        )
1418        .await
1419        .unwrap();
1420
1421        let assistant_complete = create_event_node(
1422            arbor,
1423            &tree_id,
1424            &tool_use_node,
1425            &NodeEvent::AssistantComplete { usage: None },
1426        )
1427        .await
1428        .unwrap();
1429
1430        let tool_result = create_event_node(
1431            arbor,
1432            &tree_id,
1433            &assistant_complete,
1434            &NodeEvent::UserToolResult {
1435                tool_use_id: "tool_123".to_string(),
1436                content: "File written successfully".to_string(),
1437                is_error: false,
1438            },
1439        )
1440        .await
1441        .unwrap();
1442
1443        let assistant_start2 = create_event_node(
1444            arbor,
1445            &tree_id,
1446            &tool_result,
1447            &NodeEvent::AssistantStart,
1448        )
1449        .await
1450        .unwrap();
1451
1452        let content_node2 = create_event_node(
1453            arbor,
1454            &tree_id,
1455            &assistant_start2,
1456            &NodeEvent::ContentText {
1457                text: "Done!".to_string(),
1458            },
1459        )
1460        .await
1461        .unwrap();
1462
1463        let complete_node2 = create_event_node(
1464            arbor,
1465            &tree_id,
1466            &content_node2,
1467            &NodeEvent::AssistantComplete { usage: None },
1468        )
1469        .await
1470        .unwrap();
1471
1472        // Render full conversation
1473        let messages = storage
1474            .render_messages(&tree_id, &root, &complete_node2)
1475            .await
1476            .unwrap();
1477
1478        // Expected: user, assistant (text + tool_use), user (tool_result), assistant (text)
1479        assert_eq!(messages.len(), 4, "Expected 4 messages, got {}", messages.len());
1480
1481        assert_eq!(messages[0].role, "user");
1482        assert_eq!(messages[1].role, "assistant");
1483        assert_eq!(messages[1].content.len(), 2); // text + tool_use
1484        assert_eq!(messages[2].role, "user"); // tool result
1485        assert_eq!(messages[3].role, "assistant");
1486    }
1487}