Skip to main content

plexus_substrate/activations/claudecode/
storage.rs

1use super::types::{
2    BufferedEvent, ChatEvent, ClaudeCodeConfig, ClaudeCodeError, ClaudeCodeHandle, ClaudeCodeId,
3    ClaudeCodeInfo, ClaudeMessage, ContentBlock, Message, MessageId, MessageRole, Model,
4    NodeEvent, Position, StreamId, StreamInfo, StreamStatus,
5};
6use crate::activations::arbor::{ArborStorage, NodeId, NodeType, TreeId};
7use serde_json::Value;
8use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16/// Configuration for ClaudeCode storage
17#[derive(Debug, Clone)]
18pub struct ClaudeCodeStorageConfig {
19    /// Path to SQLite database for ClaudeCode sessions
20    pub db_path: PathBuf,
21}
22
23impl Default for ClaudeCodeStorageConfig {
24    fn default() -> Self {
25        Self {
26            db_path: PathBuf::from("claudecode.db"),
27        }
28    }
29}
30
31/// In-memory buffer for an active stream
32#[derive(Debug)]
33struct ActiveStreamBuffer {
34    /// Stream metadata
35    info: StreamInfo,
36    /// Buffered events (in-order by seq)
37    events: Vec<BufferedEvent>,
38}
39
40/// Storage layer for ClaudeCode sessions
41pub struct ClaudeCodeStorage {
42    pool: SqlitePool,
43    arbor: Arc<ArborStorage>,
44    /// In-memory buffers for active streams
45    streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>>,
46}
47
48impl ClaudeCodeStorage {
49    /// Create a new ClaudeCode storage instance with a shared Arbor storage
50    pub async fn new(
51        config: ClaudeCodeStorageConfig,
52        arbor: Arc<ArborStorage>,
53    ) -> Result<Self, ClaudeCodeError> {
54        let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
55        let connect_options: SqliteConnectOptions = db_url.parse()
56            .map_err(|e| format!("Failed to parse database URL: {}", e))?;
57        let connect_options = connect_options.disable_statement_logging();
58        let pool = SqlitePool::connect_with(connect_options.clone())
59            .await
60            .map_err(|e| format!("Failed to connect to claudecode database: {}", e))?;
61
62        let storage = Self {
63            pool,
64            arbor,
65            streams: RwLock::new(HashMap::new()),
66        };
67        storage.run_migrations().await?;
68
69        Ok(storage)
70    }
71
72    /// Run database migrations
73    async fn run_migrations(&self) -> Result<(), ClaudeCodeError> {
74        sqlx::query(
75            r#"
76            CREATE TABLE IF NOT EXISTS claudecode_sessions (
77                id TEXT PRIMARY KEY,
78                name TEXT NOT NULL UNIQUE,
79                claude_session_id TEXT,
80                tree_id TEXT NOT NULL,
81                canonical_head TEXT NOT NULL,
82                working_dir TEXT NOT NULL,
83                model TEXT NOT NULL,
84                system_prompt TEXT,
85                mcp_config TEXT,
86                loopback_enabled INTEGER NOT NULL DEFAULT 0,
87                metadata TEXT,
88                created_at INTEGER NOT NULL,
89                updated_at INTEGER NOT NULL
90            );
91
92            CREATE TABLE IF NOT EXISTS claudecode_messages (
93                id TEXT PRIMARY KEY,
94                session_id TEXT NOT NULL,
95                role TEXT NOT NULL,
96                content TEXT NOT NULL,
97                model_id TEXT,
98                input_tokens INTEGER,
99                output_tokens INTEGER,
100                cost_usd REAL,
101                created_at INTEGER NOT NULL,
102                FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
103            );
104
105            CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_name ON claudecode_sessions(name);
106            CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_tree ON claudecode_sessions(tree_id);
107            CREATE INDEX IF NOT EXISTS idx_claudecode_messages_session ON claudecode_messages(session_id);
108
109            CREATE TABLE IF NOT EXISTS claudecode_unknown_events (
110                id TEXT PRIMARY KEY,
111                session_id TEXT,
112                event_type TEXT NOT NULL,
113                data TEXT NOT NULL,
114                created_at INTEGER NOT NULL,
115                FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
116            );
117
118            CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_session ON claudecode_unknown_events(session_id);
119            CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_type ON claudecode_unknown_events(event_type);
120            "#,
121        )
122        .execute(&self.pool)
123        .await
124        .map_err(|e| format!("Failed to run claudecode migrations: {}", e))?;
125
126        Ok(())
127    }
128
129    /// Get access to the underlying arbor storage
130    pub fn arbor(&self) -> &ArborStorage {
131        &self.arbor
132    }
133
134    // ========================================================================
135    // Session CRUD Operations
136    // ========================================================================
137
138    /// Create a new ClaudeCode session with a new conversation tree
139    pub async fn session_create(
140        &self,
141        name: String,
142        working_dir: String,
143        model: Model,
144        system_prompt: Option<String>,
145        mcp_config: Option<Value>,
146        loopback_enabled: bool,
147        metadata: Option<Value>,
148    ) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
149        let session_id = ClaudeCodeId::new_v4();
150        let now = current_timestamp();
151
152        // Create a new tree for this session
153        let tree_id = self
154            .arbor
155            .tree_create(metadata.clone(), &session_id.to_string())
156            .await
157            .map_err(|e| format!("Failed to create tree for session: {}", e))?;
158
159        // Get the root node as initial position
160        let tree = self
161            .arbor
162            .tree_get(&tree_id)
163            .await
164            .map_err(|e| format!("Failed to get tree: {}", e))?;
165        let head = Position::new(tree_id, tree.root);
166
167        let metadata_json = metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
168        let mcp_config_json = mcp_config.as_ref().map(|m| serde_json::to_string(m).unwrap());
169
170        // Try inserting with the original name first
171        let final_name = match sqlx::query(
172            "INSERT INTO claudecode_sessions (id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
173             VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
174        )
175        .bind(session_id.to_string())
176        .bind(&name)
177        .bind(head.tree_id.to_string())
178        .bind(head.node_id.to_string())
179        .bind(&working_dir)
180        .bind(model.as_str())
181        .bind(&system_prompt)
182        .bind(mcp_config_json.clone())
183        .bind(loopback_enabled)
184        .bind(metadata_json.clone())
185        .bind(now)
186        .bind(now)
187        .execute(&self.pool)
188        .await
189        {
190            Ok(_) => name,
191            Err(e) if e.to_string().contains("UNIQUE constraint failed") => {
192                // Name collision - append #uuid to make it unique
193                let unique_name = format!("{}#{}", name, session_id);
194
195                sqlx::query(
196                    "INSERT INTO claudecode_sessions (id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
197                     VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
198                )
199                .bind(session_id.to_string())
200                .bind(&unique_name)
201                .bind(head.tree_id.to_string())
202                .bind(head.node_id.to_string())
203                .bind(&working_dir)
204                .bind(model.as_str())
205                .bind(&system_prompt)
206                .bind(mcp_config_json)
207                .bind(loopback_enabled)
208                .bind(metadata_json)
209                .bind(now)
210                .bind(now)
211                .execute(&self.pool)
212                .await
213                .map_err(|e| format!("Failed to create session with unique name: {}", e))?;
214
215                unique_name
216            }
217            Err(e) => return Err(ClaudeCodeError::from(format!("Failed to create session: {}", e))),
218        };
219
220        Ok(ClaudeCodeConfig {
221            id: session_id,
222            name: final_name,
223            claude_session_id: None,
224            head,
225            working_dir,
226            model,
227            system_prompt,
228            mcp_config,
229            loopback_enabled,
230            metadata,
231            created_at: now,
232            updated_at: now,
233        })
234    }
235
236    /// Get a session by ID
237    pub async fn session_get(&self, session_id: &ClaudeCodeId) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
238        let row = sqlx::query(
239            "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
240             FROM claudecode_sessions WHERE id = ?",
241        )
242        .bind(session_id.to_string())
243        .fetch_optional(&self.pool)
244        .await
245        .map_err(|e| format!("Failed to fetch session: {}", e))?
246        .ok_or_else(|| format!("Session not found: {}", session_id))?;
247
248        self.row_to_config(row)
249    }
250
251    /// Get a session by name (supports partial matching)
252    pub async fn session_get_by_name(&self, name: &str) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
253        // Try exact match first
254        if let Some(row) = sqlx::query(
255            "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
256             FROM claudecode_sessions WHERE name = ?",
257        )
258        .bind(name)
259        .fetch_optional(&self.pool)
260        .await
261        .map_err(|e| format!("Failed to fetch session by name: {}", e))?
262        {
263            return self.row_to_config(row);
264        }
265
266        // Try partial match
267        let pattern = format!("{}%", name);
268        let rows = sqlx::query(
269            "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
270             FROM claudecode_sessions WHERE name LIKE ?",
271        )
272        .bind(&pattern)
273        .fetch_all(&self.pool)
274        .await
275        .map_err(|e| format!("Failed to fetch session by pattern: {}", e))?;
276
277        match rows.len() {
278            0 => Err(ClaudeCodeError::from(format!("Session not found with name: {}", name))),
279            1 => self.row_to_config(rows.into_iter().next().unwrap()),
280            _ => {
281                let matches: Vec<String> = rows.iter().map(|r| r.get("name")).collect();
282                Err(ClaudeCodeError::from(format!(
283                    "Ambiguous name '{}' matches multiple sessions: {}",
284                    name,
285                    matches.join(", ")
286                )))
287            }
288        }
289    }
290
291    /// List all sessions
292    pub async fn session_list(&self) -> Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> {
293        let rows = sqlx::query(
294            "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, loopback_enabled, created_at
295             FROM claudecode_sessions ORDER BY created_at DESC",
296        )
297        .fetch_all(&self.pool)
298        .await
299        .map_err(|e| format!("Failed to list sessions: {}", e))?;
300
301        let sessions: Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> = rows
302            .iter()
303            .map(|row| {
304                let id_str: String = row.get("id");
305                let tree_id_str: String = row.get("tree_id");
306                let head_str: String = row.get("canonical_head");
307                let model_str: String = row.get("model");
308                let loopback: i32 = row.get("loopback_enabled");
309
310                let tree_id = TreeId::parse_str(&tree_id_str)
311                    .map_err(|e| format!("Invalid tree ID: {}", e))?;
312                let node_id = NodeId::parse_str(&head_str)
313                    .map_err(|e| format!("Invalid node ID: {}", e))?;
314                let model = Model::from_str(&model_str)
315                    .ok_or_else(|| format!("Invalid model: {}", model_str))?;
316
317                Ok(ClaudeCodeInfo {
318                    id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid session ID: {}", e))?,
319                    name: row.get("name"),
320                    model,
321                    head: Position::new(tree_id, node_id),
322                    claude_session_id: row.get("claude_session_id"),
323                    working_dir: row.get("working_dir"),
324                    loopback_enabled: loopback != 0,
325                    created_at: row.get("created_at"),
326                })
327            })
328            .collect();
329
330        sessions
331    }
332
333    /// Update session's canonical head and optionally the Claude session ID
334    pub async fn session_update_head(
335        &self,
336        session_id: &ClaudeCodeId,
337        new_head: NodeId,
338        claude_session_id: Option<String>,
339    ) -> Result<(), ClaudeCodeError> {
340        let now = current_timestamp();
341
342        let result = if let Some(claude_id) = claude_session_id {
343            sqlx::query(
344                "UPDATE claudecode_sessions SET canonical_head = ?, claude_session_id = ?, updated_at = ? WHERE id = ?",
345            )
346            .bind(new_head.to_string())
347            .bind(claude_id)
348            .bind(now)
349            .bind(session_id.to_string())
350            .execute(&self.pool)
351            .await
352        } else {
353            sqlx::query(
354                "UPDATE claudecode_sessions SET canonical_head = ?, updated_at = ? WHERE id = ?",
355            )
356            .bind(new_head.to_string())
357            .bind(now)
358            .bind(session_id.to_string())
359            .execute(&self.pool)
360            .await
361        }
362        .map_err(|e| format!("Failed to update session head: {}", e))?;
363
364        if result.rows_affected() == 0 {
365            return Err(format!("Session not found: {}", session_id).into());
366        }
367
368        Ok(())
369    }
370
371    /// Update session configuration
372    pub async fn session_update(
373        &self,
374        session_id: &ClaudeCodeId,
375        name: Option<String>,
376        model: Option<Model>,
377        system_prompt: Option<Option<String>>,
378        mcp_config: Option<Value>,
379        metadata: Option<Value>,
380    ) -> Result<(), ClaudeCodeError> {
381        let now = current_timestamp();
382        let current = self.session_get(session_id).await?;
383
384        let new_name = name.unwrap_or(current.name);
385        let new_model = model.unwrap_or(current.model);
386        let new_prompt = system_prompt.unwrap_or(current.system_prompt);
387        let new_mcp = mcp_config.or(current.mcp_config);
388        let new_metadata = metadata.or(current.metadata);
389
390        let mcp_json = new_mcp.as_ref().map(|m| serde_json::to_string(m).unwrap());
391        let metadata_json = new_metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
392
393        sqlx::query(
394            "UPDATE claudecode_sessions SET name = ?, model = ?, system_prompt = ?, mcp_config = ?, metadata = ?, updated_at = ? WHERE id = ?",
395        )
396        .bind(&new_name)
397        .bind(new_model.as_str())
398        .bind(&new_prompt)
399        .bind(mcp_json)
400        .bind(metadata_json)
401        .bind(now)
402        .bind(session_id.to_string())
403        .execute(&self.pool)
404        .await
405        .map_err(|e| format!("Failed to update session: {}", e))?;
406
407        Ok(())
408    }
409
410    /// Delete a session (does not delete the arbor tree)
411    pub async fn session_delete(&self, session_id: &ClaudeCodeId) -> Result<(), ClaudeCodeError> {
412        let result = sqlx::query("DELETE FROM claudecode_sessions WHERE id = ?")
413            .bind(session_id.to_string())
414            .execute(&self.pool)
415            .await
416            .map_err(|e| format!("Failed to delete session: {}", e))?;
417
418        if result.rows_affected() == 0 {
419            return Err(format!("Session not found: {}", session_id).into());
420        }
421
422        Ok(())
423    }
424
425    // ========================================================================
426    // Message Operations
427    // ========================================================================
428
429    /// Create a message and return it
430    pub async fn message_create(
431        &self,
432        session_id: &ClaudeCodeId,
433        role: MessageRole,
434        content: String,
435        model_id: Option<String>,
436        input_tokens: Option<i64>,
437        output_tokens: Option<i64>,
438        cost_usd: Option<f64>,
439    ) -> Result<Message, ClaudeCodeError> {
440        let message_id = MessageId::new_v4();
441        let now = current_timestamp();
442
443        sqlx::query(
444            "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
445             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
446        )
447        .bind(message_id.to_string())
448        .bind(session_id.to_string())
449        .bind(role.as_str())
450        .bind(&content)
451        .bind(&model_id)
452        .bind(input_tokens)
453        .bind(output_tokens)
454        .bind(cost_usd)
455        .bind(now)
456        .execute(&self.pool)
457        .await
458        .map_err(|e| format!("Failed to create message: {}", e))?;
459
460        Ok(Message {
461            id: message_id,
462            session_id: *session_id,
463            role,
464            content,
465            created_at: now,
466            model_id,
467            input_tokens,
468            output_tokens,
469            cost_usd,
470        })
471    }
472
473    /// Create an ephemeral message (marked for deletion) and return it
474    pub async fn message_create_ephemeral(
475        &self,
476        session_id: &ClaudeCodeId,
477        role: MessageRole,
478        content: String,
479        model_id: Option<String>,
480        input_tokens: Option<i64>,
481        output_tokens: Option<i64>,
482        cost_usd: Option<f64>,
483    ) -> Result<Message, ClaudeCodeError> {
484        let message_id = MessageId::new_v4();
485        let now = current_timestamp();
486
487        // Insert with a special marker in metadata or a separate flag
488        // For now, we'll use a negative timestamp as a deletion marker
489        // Messages with negative created_at are ephemeral and should be cleaned up
490        let ephemeral_marker = -now;
491
492        sqlx::query(
493            "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
494             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
495        )
496        .bind(message_id.to_string())
497        .bind(session_id.to_string())
498        .bind(role.as_str())
499        .bind(&content)
500        .bind(&model_id)
501        .bind(input_tokens)
502        .bind(output_tokens)
503        .bind(cost_usd)
504        .bind(ephemeral_marker)
505        .execute(&self.pool)
506        .await
507        .map_err(|e| format!("Failed to create ephemeral message: {}", e))?;
508
509        Ok(Message {
510            id: message_id,
511            session_id: *session_id,
512            role,
513            content,
514            created_at: ephemeral_marker,
515            model_id,
516            input_tokens,
517            output_tokens,
518            cost_usd,
519        })
520    }
521
522    /// Get a message by ID
523    pub async fn message_get(&self, message_id: &MessageId) -> Result<Message, ClaudeCodeError> {
524        let row = sqlx::query(
525            "SELECT id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at
526             FROM claudecode_messages WHERE id = ?",
527        )
528        .bind(message_id.to_string())
529        .fetch_optional(&self.pool)
530        .await
531        .map_err(|e| format!("Failed to fetch message: {}", e))?
532        .ok_or_else(|| format!("Message not found: {}", message_id))?;
533
534        self.row_to_message(row)
535    }
536
537    /// Resolve a message handle identifier to a Message
538    /// Handle format: "msg-{message_id}:{role}:{name}"
539    pub async fn resolve_message_handle(&self, identifier: &str) -> Result<Message, ClaudeCodeError> {
540        let parts: Vec<&str> = identifier.splitn(3, ':').collect();
541        if parts.len() < 2 {
542            return Err(format!("Invalid message handle format: {}", identifier).into());
543        }
544
545        let msg_part = parts[0];
546        if !msg_part.starts_with("msg-") {
547            return Err(format!("Invalid message handle format: {}", identifier).into());
548        }
549
550        let message_id_str = &msg_part[4..];
551        let message_id = Uuid::parse_str(message_id_str)
552            .map_err(|e| format!("Invalid message ID in handle: {}", e))?;
553
554        self.message_get(&message_id).await
555    }
556
557    /// Create a handle for a message
558    ///
559    /// Format: `{plugin_id}@1.0.0::chat:msg-{id}:{role}:{name}`
560    /// Uses ClaudeCodeHandle enum for type-safe handle creation.
561    pub fn message_to_handle(message: &Message, name: &str) -> crate::types::Handle {
562        ClaudeCodeHandle::Message {
563            message_id: format!("msg-{}", message.id),
564            role: message.role.as_str().to_string(),
565            name: name.to_string(),
566        }.to_handle()
567    }
568
569    // ========================================================================
570    // Unknown Event Operations
571    // ========================================================================
572
573    /// Store an unknown event and return its ID (handle)
574    pub async fn unknown_event_store(
575        &self,
576        session_id: Option<&ClaudeCodeId>,
577        event_type: &str,
578        data: &Value,
579    ) -> Result<String, ClaudeCodeError> {
580        let id = Uuid::new_v4().to_string();
581        let now = current_timestamp();
582        let data_json = serde_json::to_string(data)
583            .map_err(|e| format!("Failed to serialize unknown event data: {}", e))?;
584
585        sqlx::query(
586            "INSERT INTO claudecode_unknown_events (id, session_id, event_type, data, created_at)
587             VALUES (?, ?, ?, ?, ?)",
588        )
589        .bind(&id)
590        .bind(session_id.map(|s| s.to_string()))
591        .bind(event_type)
592        .bind(&data_json)
593        .bind(now)
594        .execute(&self.pool)
595        .await
596        .map_err(|e| format!("Failed to store unknown event: {}", e))?;
597
598        Ok(id)
599    }
600
601    /// Retrieve an unknown event by ID
602    pub async fn unknown_event_get(&self, id: &str) -> Result<(String, Value), ClaudeCodeError> {
603        let row = sqlx::query(
604            "SELECT event_type, data FROM claudecode_unknown_events WHERE id = ?",
605        )
606        .bind(id)
607        .fetch_optional(&self.pool)
608        .await
609        .map_err(|e| format!("Failed to fetch unknown event: {}", e))?
610        .ok_or_else(|| format!("Unknown event not found: {}", id))?;
611
612        let event_type: String = row.get("event_type");
613        let data_json: String = row.get("data");
614        let data: Value = serde_json::from_str(&data_json)
615            .map_err(|e| format!("Failed to parse unknown event data: {}", e))?;
616
617        Ok((event_type, data))
618    }
619
620    /// List unknown events by type (for analysis/debugging)
621    pub async fn unknown_events_by_type(&self, event_type: &str) -> Result<Vec<(String, Value)>, ClaudeCodeError> {
622        let rows = sqlx::query(
623            "SELECT id, data FROM claudecode_unknown_events WHERE event_type = ? ORDER BY created_at DESC",
624        )
625        .bind(event_type)
626        .fetch_all(&self.pool)
627        .await
628        .map_err(|e| format!("Failed to list unknown events: {}", e))?;
629
630        rows.iter()
631            .map(|row| {
632                let id: String = row.get("id");
633                let data_json: String = row.get("data");
634                let data: Value = serde_json::from_str(&data_json)
635                    .map_err(|e| format!("Failed to parse unknown event data: {}", e))?;
636                Ok((id, data))
637            })
638            .collect()
639    }
640
641    // ========================================================================
642    // Stream Management (in-memory buffer for async chat)
643    // ========================================================================
644
645    /// Create a new stream buffer for async chat
646    pub async fn stream_create(
647        &self,
648        session_id: ClaudeCodeId,
649    ) -> Result<StreamId, ClaudeCodeError> {
650        let stream_id = StreamId::new_v4();
651        let now = current_timestamp();
652
653        let info = StreamInfo {
654            stream_id,
655            session_id,
656            status: StreamStatus::Running,
657            user_position: None,
658            event_count: 0,
659            read_position: 0,
660            started_at: now,
661            ended_at: None,
662            error: None,
663        };
664
665        let buffer = ActiveStreamBuffer {
666            info,
667            events: Vec::new(),
668        };
669
670        let mut streams = self.streams.write().await;
671        streams.insert(stream_id, buffer);
672
673        Ok(stream_id)
674    }
675
676    /// Set the user position for a stream (called after user message is created)
677    pub async fn stream_set_user_position(
678        &self,
679        stream_id: &StreamId,
680        position: Position,
681    ) -> Result<(), ClaudeCodeError> {
682        let mut streams = self.streams.write().await;
683        let buffer = streams.get_mut(stream_id)
684            .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
685        buffer.info.user_position = Some(position);
686        Ok(())
687    }
688
689    /// Push an event to a stream buffer
690    pub async fn stream_push_event(
691        &self,
692        stream_id: &StreamId,
693        event: ChatEvent,
694    ) -> Result<u64, ClaudeCodeError> {
695        let now = current_timestamp();
696        let mut streams = self.streams.write().await;
697        let buffer = streams.get_mut(stream_id)
698            .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
699
700        let seq = buffer.info.event_count;
701        buffer.events.push(BufferedEvent {
702            seq,
703            event,
704            timestamp: now,
705        });
706        buffer.info.event_count += 1;
707
708        Ok(seq)
709    }
710
711    /// Update stream status
712    pub async fn stream_set_status(
713        &self,
714        stream_id: &StreamId,
715        status: StreamStatus,
716        error: Option<String>,
717    ) -> Result<(), ClaudeCodeError> {
718        let now = current_timestamp();
719        let mut streams = self.streams.write().await;
720        let buffer = streams.get_mut(stream_id)
721            .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
722
723        buffer.info.status = status;
724        if status == StreamStatus::Complete || status == StreamStatus::Failed {
725            buffer.info.ended_at = Some(now);
726        }
727        if let Some(e) = error {
728            buffer.info.error = Some(e);
729        }
730
731        Ok(())
732    }
733
734    /// Get stream info
735    pub async fn stream_get_info(&self, stream_id: &StreamId) -> Result<StreamInfo, ClaudeCodeError> {
736        let streams = self.streams.read().await;
737        let buffer = streams.get(stream_id)
738            .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
739        Ok(buffer.info.clone())
740    }
741
742    /// Poll events from a stream
743    /// Returns events starting from `from_seq` up to `limit` events
744    pub async fn stream_poll(
745        &self,
746        stream_id: &StreamId,
747        from_seq: Option<u64>,
748        limit: Option<usize>,
749    ) -> Result<(StreamInfo, Vec<BufferedEvent>), ClaudeCodeError> {
750        let mut streams = self.streams.write().await;
751        let buffer = streams.get_mut(stream_id)
752            .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
753
754        let start = from_seq.unwrap_or(buffer.info.read_position) as usize;
755        let max_events = limit.unwrap_or(100);
756
757        let events: Vec<BufferedEvent> = buffer.events
758            .iter()
759            .skip(start)
760            .take(max_events)
761            .cloned()
762            .collect();
763
764        // Update read position to the end of what we returned
765        if !events.is_empty() {
766            let last_seq = events.last().unwrap().seq;
767            buffer.info.read_position = last_seq + 1;
768        }
769
770        Ok((buffer.info.clone(), events))
771    }
772
773    /// List all active streams
774    pub async fn stream_list(&self) -> Vec<StreamInfo> {
775        let streams = self.streams.read().await;
776        streams.values().map(|b| b.info.clone()).collect()
777    }
778
779    /// List active streams for a session
780    pub async fn stream_list_for_session(&self, session_id: &ClaudeCodeId) -> Vec<StreamInfo> {
781        let streams = self.streams.read().await;
782        streams
783            .values()
784            .filter(|b| &b.info.session_id == session_id)
785            .map(|b| b.info.clone())
786            .collect()
787    }
788
789    /// Remove a completed/failed stream from memory
790    /// Returns the final stream info if found
791    pub async fn stream_cleanup(&self, stream_id: &StreamId) -> Option<StreamInfo> {
792        let mut streams = self.streams.write().await;
793        streams.remove(stream_id).map(|b| b.info)
794    }
795
796    /// Check if a stream exists
797    pub async fn stream_exists(&self, stream_id: &StreamId) -> bool {
798        let streams = self.streams.read().await;
799        streams.contains_key(stream_id)
800    }
801
802    // ========================================================================
803    // Arbor Rendering (Milestone 3)
804    // ========================================================================
805
806    /// Render arbor tree path into Claude API messages format
807    ///
808    /// Walks from start to end node, parsing NodeEvent JSON from each node,
809    /// and groups into Claude messages array.
810    ///
811    /// # Algorithm
812    /// 1. Get path from start to end via arbor
813    /// 2. Parse each node's content as NodeEvent
814    /// 3. Group into messages based on event type
815    /// 4. Return messages array
816    pub async fn render_messages(
817        &self,
818        tree_id: &TreeId,
819        start: &NodeId,
820        end: &NodeId,
821    ) -> Result<Vec<ClaudeMessage>, ClaudeCodeError> {
822        // 1. Get path from root to end (returns Vec<NodeId>)
823        let node_ids = self
824            .arbor
825            .node_get_path(tree_id, end)
826            .await
827            .map_err(|e| format!("Failed to get node path: {}", e))?;
828
829        // Find the starting node in the path
830        let start_idx = node_ids
831            .iter()
832            .position(|id| id == start)
833            .ok_or_else(|| format!("Start node not found in path from root to end"))?;
834
835        let node_ids = &node_ids[start_idx..];
836
837        // 2. Get full node data for each node ID and group into messages
838        let mut messages: Vec<ClaudeMessage> = Vec::new();
839        let mut current_role: Option<String> = None;
840        let mut current_content: Vec<ContentBlock> = Vec::new();
841
842        for node_id in node_ids {
843            // Get the full node data
844            let node = self
845                .arbor
846                .node_get(tree_id, node_id)
847                .await
848                .map_err(|e| format!("Failed to get node: {}", e))?;
849
850            // Extract content from node data
851            let content = match &node.data {
852                NodeType::Text { content } => content,
853                NodeType::External { .. } => {
854                    // Skip external nodes for now
855                    continue;
856                }
857            };
858
859            // Skip empty nodes (like root)
860            if content.trim().is_empty() {
861                continue;
862            }
863
864            // Parse node content as NodeEvent
865            let event: NodeEvent = serde_json::from_str(content)
866                .map_err(|e| format!("Failed to parse node content as NodeEvent: {}", e))?;
867
868            match event {
869                NodeEvent::UserMessage { content } => {
870                    // Flush previous message if any
871                    if let Some(role) = current_role.take() {
872                        if !current_content.is_empty() {
873                            messages.push(ClaudeMessage {
874                                role,
875                                content: current_content.clone(),
876                            });
877                            current_content.clear();
878                        }
879                    }
880
881                    // Start new user message
882                    current_role = Some("user".to_string());
883                    current_content.push(ContentBlock::Text { text: content });
884                }
885
886                NodeEvent::AssistantStart => {
887                    // Flush previous message if any
888                    if let Some(role) = current_role.take() {
889                        if !current_content.is_empty() {
890                            messages.push(ClaudeMessage {
891                                role,
892                                content: current_content.clone(),
893                            });
894                            current_content.clear();
895                        }
896                    }
897
898                    // Start new assistant message
899                    current_role = Some("assistant".to_string());
900                }
901
902                NodeEvent::ContentText { text } => {
903                    // Add text content to current message
904                    current_content.push(ContentBlock::Text { text });
905                }
906
907                NodeEvent::ContentToolUse { id, name, input } => {
908                    // Add tool use to current message
909                    current_content.push(ContentBlock::ToolUse { id, name, input });
910                }
911
912                NodeEvent::ContentThinking { thinking } => {
913                    // Add thinking block to current message
914                    current_content.push(ContentBlock::Thinking { thinking });
915                }
916
917                NodeEvent::UserToolResult {
918                    tool_use_id,
919                    content,
920                    is_error,
921                } => {
922                    // Flush current assistant message if any
923                    if let Some(role) = current_role.take() {
924                        if !current_content.is_empty() {
925                            messages.push(ClaudeMessage {
926                                role,
927                                content: current_content.clone(),
928                            });
929                            current_content.clear();
930                        }
931                    }
932
933                    // Tool results become separate user messages
934                    messages.push(ClaudeMessage {
935                        role: "user".to_string(),
936                        content: vec![ContentBlock::ToolResult {
937                            tool_use_id,
938                            content,
939                            is_error,
940                        }],
941                    });
942                }
943
944                NodeEvent::AssistantComplete { usage: _ } => {
945                    // Flush current message if any
946                    if let Some(role) = current_role.take() {
947                        if !current_content.is_empty() {
948                            messages.push(ClaudeMessage {
949                                role,
950                                content: current_content.clone(),
951                            });
952                            current_content.clear();
953                        }
954                    }
955                }
956            }
957        }
958
959        // Flush any remaining message
960        if let Some(role) = current_role {
961            if !current_content.is_empty() {
962                messages.push(ClaudeMessage {
963                    role,
964                    content: current_content,
965                });
966            }
967        }
968
969        Ok(messages)
970    }
971
972    // ========================================================================
973    // Helper methods
974    // ========================================================================
975
976    fn row_to_message(&self, row: sqlx::sqlite::SqliteRow) -> Result<Message, ClaudeCodeError> {
977        let id_str: String = row.get("id");
978        let session_id_str: String = row.get("session_id");
979        let role_str: String = row.get("role");
980
981        Ok(Message {
982            id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid message ID: {}", e))?,
983            session_id: Uuid::parse_str(&session_id_str)
984                .map_err(|e| format!("Invalid session ID: {}", e))?,
985            role: MessageRole::from_str(&role_str)
986                .ok_or_else(|| format!("Invalid role: {}", role_str))?,
987            content: row.get("content"),
988            created_at: row.get("created_at"),
989            model_id: row.get("model_id"),
990            input_tokens: row.get("input_tokens"),
991            output_tokens: row.get("output_tokens"),
992            cost_usd: row.get("cost_usd"),
993        })
994    }
995
996    fn row_to_config(&self, row: sqlx::sqlite::SqliteRow) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
997        let id_str: String = row.get("id");
998        let tree_id_str: String = row.get("tree_id");
999        let head_str: String = row.get("canonical_head");
1000        let model_str: String = row.get("model");
1001        let metadata_json: Option<String> = row.get("metadata");
1002        let mcp_config_json: Option<String> = row.get("mcp_config");
1003        let loopback: i32 = row.get("loopback_enabled");
1004
1005        let tree_id = TreeId::parse_str(&tree_id_str)
1006            .map_err(|e| format!("Invalid tree ID: {}", e))?;
1007        let node_id = NodeId::parse_str(&head_str)
1008            .map_err(|e| format!("Invalid node ID: {}", e))?;
1009        let model = Model::from_str(&model_str)
1010            .ok_or_else(|| format!("Invalid model: {}", model_str))?;
1011
1012        Ok(ClaudeCodeConfig {
1013            id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid session ID: {}", e))?,
1014            name: row.get("name"),
1015            claude_session_id: row.get("claude_session_id"),
1016            head: Position::new(tree_id, node_id),
1017            working_dir: row.get("working_dir"),
1018            model,
1019            system_prompt: row.get("system_prompt"),
1020            mcp_config: mcp_config_json.and_then(|s| serde_json::from_str(&s).ok()),
1021            loopback_enabled: loopback != 0,
1022            metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
1023            created_at: row.get("created_at"),
1024            updated_at: row.get("updated_at"),
1025        })
1026    }
1027}
1028
1029/// Get current Unix timestamp in seconds
1030fn current_timestamp() -> i64 {
1031    SystemTime::now()
1032        .duration_since(UNIX_EPOCH)
1033        .unwrap()
1034        .as_secs() as i64
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040
1041    /// Test stream buffer in-memory operations (no database needed)
1042    #[tokio::test]
1043    async fn test_stream_buffer_operations() {
1044        // Create a minimal storage with just the streams buffer
1045        let streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>> = RwLock::new(HashMap::new());
1046
1047        // Create a stream
1048        let stream_id = StreamId::new_v4();
1049        let session_id = ClaudeCodeId::new_v4();
1050        let now = current_timestamp();
1051
1052        let info = StreamInfo {
1053            stream_id,
1054            session_id,
1055            status: StreamStatus::Running,
1056            user_position: None,
1057            event_count: 0,
1058            read_position: 0,
1059            started_at: now,
1060            ended_at: None,
1061            error: None,
1062        };
1063
1064        let buffer = ActiveStreamBuffer {
1065            info,
1066            events: Vec::new(),
1067        };
1068
1069        streams.write().await.insert(stream_id, buffer);
1070
1071        // Push some events
1072        {
1073            let mut streams = streams.write().await;
1074            let buffer = streams.get_mut(&stream_id).unwrap();
1075
1076            buffer.events.push(BufferedEvent {
1077                seq: 0,
1078                event: ChatEvent::Start {
1079                    id: session_id,
1080                    user_position: Position::new(TreeId::new(), NodeId::new()),
1081                },
1082                timestamp: now,
1083            });
1084            buffer.info.event_count = 1;
1085
1086            buffer.events.push(BufferedEvent {
1087                seq: 1,
1088                event: ChatEvent::Content { text: "Hello".to_string() },
1089                timestamp: now,
1090            });
1091            buffer.info.event_count = 2;
1092        }
1093
1094        // Poll events
1095        {
1096            let mut streams = streams.write().await;
1097            let buffer = streams.get_mut(&stream_id).unwrap();
1098
1099            let events: Vec<_> = buffer.events.iter().skip(0).take(10).cloned().collect();
1100            assert_eq!(events.len(), 2);
1101            assert_eq!(events[0].seq, 0);
1102            assert_eq!(events[1].seq, 1);
1103
1104            // Update read position
1105            buffer.info.read_position = 2;
1106        }
1107
1108        // Poll again - should get nothing new
1109        {
1110            let streams = streams.read().await;
1111            let buffer = streams.get(&stream_id).unwrap();
1112
1113            let events: Vec<_> = buffer.events.iter()
1114                .skip(buffer.info.read_position as usize)
1115                .take(10)
1116                .collect();
1117            assert_eq!(events.len(), 0);
1118        }
1119
1120        // Add more events
1121        {
1122            let mut streams = streams.write().await;
1123            let buffer = streams.get_mut(&stream_id).unwrap();
1124
1125            buffer.events.push(BufferedEvent {
1126                seq: 2,
1127                event: ChatEvent::Content { text: " World".to_string() },
1128                timestamp: now,
1129            });
1130            buffer.info.event_count = 3;
1131        }
1132
1133        // Poll again - should get the new event
1134        {
1135            let mut streams = streams.write().await;
1136            let buffer = streams.get_mut(&stream_id).unwrap();
1137
1138            let events: Vec<_> = buffer.events.iter()
1139                .skip(buffer.info.read_position as usize)
1140                .take(10)
1141                .cloned()
1142                .collect();
1143            assert_eq!(events.len(), 1);
1144            assert_eq!(events[0].seq, 2);
1145
1146            // Update read position
1147            buffer.info.read_position = 3;
1148        }
1149
1150        // Test status transitions
1151        {
1152            let mut streams = streams.write().await;
1153            let buffer = streams.get_mut(&stream_id).unwrap();
1154
1155            assert_eq!(buffer.info.status, StreamStatus::Running);
1156
1157            buffer.info.status = StreamStatus::AwaitingPermission;
1158            assert_eq!(buffer.info.status, StreamStatus::AwaitingPermission);
1159
1160            buffer.info.status = StreamStatus::Complete;
1161            buffer.info.ended_at = Some(current_timestamp());
1162            assert_eq!(buffer.info.status, StreamStatus::Complete);
1163            assert!(buffer.info.ended_at.is_some());
1164        }
1165    }
1166
1167    #[test]
1168    fn test_stream_status_serialization() {
1169        // Test that StreamStatus serializes correctly for MCP
1170        let status = StreamStatus::AwaitingPermission;
1171        let json = serde_json::to_string(&status).unwrap();
1172        assert_eq!(json, "\"awaiting_permission\"");
1173
1174        let status = StreamStatus::Running;
1175        let json = serde_json::to_string(&status).unwrap();
1176        assert_eq!(json, "\"running\"");
1177    }
1178}
1179
1180// ============================================================================
1181// Milestone 3 Tests: render_messages()
1182// ============================================================================
1183
1184#[cfg(test)]
1185mod render_tests {
1186    use super::*;
1187    use crate::activations::arbor::{ArborConfig, ArborError};
1188
1189    /// Helper to create test storage with arbor
1190    async fn create_test_storage() -> (ClaudeCodeStorage, PathBuf) {
1191        let temp_dir = std::env::temp_dir();
1192        let test_id = Uuid::new_v4();
1193        let arbor_path = temp_dir.join(format!("test_arbor_{}.db", test_id));
1194        let claudecode_path = temp_dir.join(format!("test_claudecode_{}.db", test_id));
1195
1196        let arbor_config = ArborConfig {
1197            db_path: arbor_path.clone(),
1198            scheduled_deletion_window: 604800,
1199            archive_window: 2592000,
1200            auto_cleanup: false, // Disable for tests
1201            cleanup_interval: 3600,
1202        };
1203        let arbor = Arc::new(ArborStorage::new(arbor_config).await.unwrap());
1204
1205        let claudecode_config = ClaudeCodeStorageConfig {
1206            db_path: claudecode_path.clone(),
1207        };
1208        let storage = ClaudeCodeStorage::new(claudecode_config, arbor)
1209            .await
1210            .unwrap();
1211
1212        (storage, arbor_path)
1213    }
1214
1215    /// Helper to create a node with NodeEvent content
1216    async fn create_event_node(
1217        arbor: &ArborStorage,
1218        tree_id: &TreeId,
1219        parent_id: &NodeId,
1220        event: &NodeEvent,
1221    ) -> Result<NodeId, ArborError> {
1222        let content = serde_json::to_string(event).map_err(|e| e.to_string())?;
1223        arbor.node_create_text(tree_id, Some(*parent_id), content, None).await
1224    }
1225
1226    #[tokio::test]
1227    async fn test_render_simple_exchange() {
1228        let (storage, _temp_path) = create_test_storage().await;
1229        let arbor = storage.arbor();
1230
1231        // Create test arbor tree
1232        let tree_id = arbor.tree_create(None, "test-session").await.unwrap();
1233        let tree = arbor.tree_get(&tree_id).await.unwrap();
1234        let root = tree.root;
1235
1236        // Build tree: root -> user -> assistant_start -> content -> assistant_complete
1237        let user_node = create_event_node(
1238            arbor,
1239            &tree_id,
1240            &root,
1241            &NodeEvent::UserMessage {
1242                content: "Hello".to_string(),
1243            },
1244        )
1245        .await
1246        .unwrap();
1247
1248        let assistant_start = create_event_node(
1249            arbor,
1250            &tree_id,
1251            &user_node,
1252            &NodeEvent::AssistantStart,
1253        )
1254        .await
1255        .unwrap();
1256
1257        let content_node = create_event_node(
1258            arbor,
1259            &tree_id,
1260            &assistant_start,
1261            &NodeEvent::ContentText {
1262                text: "Hi there!".to_string(),
1263            },
1264        )
1265        .await
1266        .unwrap();
1267
1268        let complete_node = create_event_node(
1269            arbor,
1270            &tree_id,
1271            &content_node,
1272            &NodeEvent::AssistantComplete { usage: None },
1273        )
1274        .await
1275        .unwrap();
1276
1277        // Render from root to end
1278        let messages = storage
1279            .render_messages(&tree_id, &root, &complete_node)
1280            .await
1281            .unwrap();
1282
1283        // Verify: 2 messages (user + assistant)
1284        assert_eq!(messages.len(), 2);
1285
1286        // Verify user message
1287        assert_eq!(messages[0].role, "user");
1288        assert_eq!(messages[0].content.len(), 1);
1289        if let ContentBlock::Text { text } = &messages[0].content[0] {
1290            assert_eq!(text, "Hello");
1291        } else {
1292            panic!("Expected text content block");
1293        }
1294
1295        // Verify assistant message
1296        assert_eq!(messages[1].role, "assistant");
1297        assert_eq!(messages[1].content.len(), 1);
1298        if let ContentBlock::Text { text } = &messages[1].content[0] {
1299            assert_eq!(text, "Hi there!");
1300        } else {
1301            panic!("Expected text content block");
1302        }
1303    }
1304
1305    #[tokio::test]
1306    async fn test_render_with_tool_use() {
1307        let (storage, _temp_path) = create_test_storage().await;
1308        let arbor = storage.arbor();
1309
1310        // Create test arbor tree
1311        let tree_id = arbor.tree_create(None, "test-tool-session").await.unwrap();
1312        let tree = arbor.tree_get(&tree_id).await.unwrap();
1313        let root = tree.root;
1314
1315        // Build tree with tool use:
1316        // root -> user -> assistant_start -> content -> tool_use -> assistant_complete
1317        //              -> user_tool_result -> assistant_start -> content -> assistant_complete
1318        let user_node = create_event_node(
1319            arbor,
1320            &tree_id,
1321            &root,
1322            &NodeEvent::UserMessage {
1323                content: "Write a file".to_string(),
1324            },
1325        )
1326        .await
1327        .unwrap();
1328
1329        let assistant_start1 = create_event_node(
1330            arbor,
1331            &tree_id,
1332            &user_node,
1333            &NodeEvent::AssistantStart,
1334        )
1335        .await
1336        .unwrap();
1337
1338        let content1 = create_event_node(
1339            arbor,
1340            &tree_id,
1341            &assistant_start1,
1342            &NodeEvent::ContentText {
1343                text: "I'll write the file".to_string(),
1344            },
1345        )
1346        .await
1347        .unwrap();
1348
1349        let tool_use = create_event_node(
1350            arbor,
1351            &tree_id,
1352            &content1,
1353            &NodeEvent::ContentToolUse {
1354                id: "toolu_123".to_string(),
1355                name: "Write".to_string(),
1356                input: serde_json::json!({"file_path": "/tmp/test.txt", "content": "hello"}),
1357            },
1358        )
1359        .await
1360        .unwrap();
1361
1362        let complete1 = create_event_node(
1363            arbor,
1364            &tree_id,
1365            &tool_use,
1366            &NodeEvent::AssistantComplete { usage: None },
1367        )
1368        .await
1369        .unwrap();
1370
1371        let tool_result = create_event_node(
1372            arbor,
1373            &tree_id,
1374            &complete1,
1375            &NodeEvent::UserToolResult {
1376                tool_use_id: "toolu_123".to_string(),
1377                content: "File written successfully".to_string(),
1378                is_error: false,
1379            },
1380        )
1381        .await
1382        .unwrap();
1383
1384        let assistant_start2 = create_event_node(
1385            arbor,
1386            &tree_id,
1387            &tool_result,
1388            &NodeEvent::AssistantStart,
1389        )
1390        .await
1391        .unwrap();
1392
1393        let content2 = create_event_node(
1394            arbor,
1395            &tree_id,
1396            &assistant_start2,
1397            &NodeEvent::ContentText {
1398                text: "Done!".to_string(),
1399            },
1400        )
1401        .await
1402        .unwrap();
1403
1404        let complete2 = create_event_node(
1405            arbor,
1406            &tree_id,
1407            &content2,
1408            &NodeEvent::AssistantComplete { usage: None },
1409        )
1410        .await
1411        .unwrap();
1412
1413        // Render from root to end
1414        let messages = storage
1415            .render_messages(&tree_id, &root, &complete2)
1416            .await
1417            .unwrap();
1418
1419        // Verify: 4 messages (user, assistant with tool, user with result, assistant response)
1420        assert_eq!(messages.len(), 4);
1421
1422        // Message 0: User message
1423        assert_eq!(messages[0].role, "user");
1424        assert_eq!(messages[0].content.len(), 1);
1425
1426        // Message 1: Assistant message with text + tool_use
1427        assert_eq!(messages[1].role, "assistant");
1428        assert_eq!(messages[1].content.len(), 2);
1429        if let ContentBlock::Text { text } = &messages[1].content[0] {
1430            assert_eq!(text, "I'll write the file");
1431        } else {
1432            panic!("Expected text content block");
1433        }
1434        if let ContentBlock::ToolUse { id, name, .. } = &messages[1].content[1] {
1435            assert_eq!(id, "toolu_123");
1436            assert_eq!(name, "Write");
1437        } else {
1438            panic!("Expected tool_use content block");
1439        }
1440
1441        // Message 2: User message with tool_result
1442        assert_eq!(messages[2].role, "user");
1443        assert_eq!(messages[2].content.len(), 1);
1444        if let ContentBlock::ToolResult {
1445            tool_use_id,
1446            content,
1447            is_error,
1448        } = &messages[2].content[0]
1449        {
1450            assert_eq!(tool_use_id, "toolu_123");
1451            assert_eq!(content, "File written successfully");
1452            assert_eq!(*is_error, false);
1453        } else {
1454            panic!("Expected tool_result content block");
1455        }
1456
1457        // Message 3: Assistant response
1458        assert_eq!(messages[3].role, "assistant");
1459        assert_eq!(messages[3].content.len(), 1);
1460        if let ContentBlock::Text { text } = &messages[3].content[0] {
1461            assert_eq!(text, "Done!");
1462        } else {
1463            panic!("Expected text content block");
1464        }
1465    }
1466
1467    #[tokio::test]
1468    async fn test_render_with_thinking() {
1469        let (storage, _temp_path) = create_test_storage().await;
1470        let arbor = storage.arbor();
1471
1472        // Create test arbor tree
1473        let tree_id = arbor.tree_create(None, "test-thinking").await.unwrap();
1474        let tree = arbor.tree_get(&tree_id).await.unwrap();
1475        let root = tree.root;
1476
1477        // Build tree with thinking block
1478        let user_node = create_event_node(
1479            arbor,
1480            &tree_id,
1481            &root,
1482            &NodeEvent::UserMessage {
1483                content: "Solve this problem".to_string(),
1484            },
1485        )
1486        .await
1487        .unwrap();
1488
1489        let assistant_start = create_event_node(
1490            arbor,
1491            &tree_id,
1492            &user_node,
1493            &NodeEvent::AssistantStart,
1494        )
1495        .await
1496        .unwrap();
1497
1498        let thinking = create_event_node(
1499            arbor,
1500            &tree_id,
1501            &assistant_start,
1502            &NodeEvent::ContentThinking {
1503                thinking: "Let me think about this...".to_string(),
1504            },
1505        )
1506        .await
1507        .unwrap();
1508
1509        let content = create_event_node(
1510            arbor,
1511            &tree_id,
1512            &thinking,
1513            &NodeEvent::ContentText {
1514                text: "Here's the solution".to_string(),
1515            },
1516        )
1517        .await
1518        .unwrap();
1519
1520        let complete = create_event_node(
1521            arbor,
1522            &tree_id,
1523            &content,
1524            &NodeEvent::AssistantComplete { usage: None },
1525        )
1526        .await
1527        .unwrap();
1528
1529        // Render
1530        let messages = storage
1531            .render_messages(&tree_id, &root, &complete)
1532            .await
1533            .unwrap();
1534
1535        // Verify: 2 messages
1536        assert_eq!(messages.len(), 2);
1537
1538        // Assistant message should have thinking + text
1539        assert_eq!(messages[1].role, "assistant");
1540        assert_eq!(messages[1].content.len(), 2);
1541        if let ContentBlock::Thinking { thinking } = &messages[1].content[0] {
1542            assert_eq!(thinking, "Let me think about this...");
1543        } else {
1544            panic!("Expected thinking content block");
1545        }
1546        if let ContentBlock::Text { text } = &messages[1].content[1] {
1547            assert_eq!(text, "Here's the solution");
1548        } else {
1549            panic!("Expected text content block");
1550        }
1551    }
1552
1553    #[tokio::test]
1554    async fn test_render_partial_path() {
1555        let (storage, _temp_path) = create_test_storage().await;
1556        let arbor = storage.arbor();
1557
1558        // Create test arbor tree
1559        let tree_id = arbor.tree_create(None, "test-partial").await.unwrap();
1560        let tree = arbor.tree_get(&tree_id).await.unwrap();
1561        let root = tree.root;
1562
1563        // Build tree with multiple exchanges
1564        let user1 = create_event_node(
1565            arbor,
1566            &tree_id,
1567            &root,
1568            &NodeEvent::UserMessage {
1569                content: "First message".to_string(),
1570            },
1571        )
1572        .await
1573        .unwrap();
1574
1575        let assistant_start1 = create_event_node(
1576            arbor,
1577            &tree_id,
1578            &user1,
1579            &NodeEvent::AssistantStart,
1580        )
1581        .await
1582        .unwrap();
1583
1584        let content1 = create_event_node(
1585            arbor,
1586            &tree_id,
1587            &assistant_start1,
1588            &NodeEvent::ContentText {
1589                text: "First response".to_string(),
1590            },
1591        )
1592        .await
1593        .unwrap();
1594
1595        let complete1 = create_event_node(
1596            arbor,
1597            &tree_id,
1598            &content1,
1599            &NodeEvent::AssistantComplete { usage: None },
1600        )
1601        .await
1602        .unwrap();
1603
1604        let user2 = create_event_node(
1605            arbor,
1606            &tree_id,
1607            &complete1,
1608            &NodeEvent::UserMessage {
1609                content: "Second message".to_string(),
1610            },
1611        )
1612        .await
1613        .unwrap();
1614
1615        let assistant_start2 = create_event_node(
1616            arbor,
1617            &tree_id,
1618            &user2,
1619            &NodeEvent::AssistantStart,
1620        )
1621        .await
1622        .unwrap();
1623
1624        let content2 = create_event_node(
1625            arbor,
1626            &tree_id,
1627            &assistant_start2,
1628            &NodeEvent::ContentText {
1629                text: "Second response".to_string(),
1630            },
1631        )
1632        .await
1633        .unwrap();
1634
1635        let complete2 = create_event_node(
1636            arbor,
1637            &tree_id,
1638            &content2,
1639            &NodeEvent::AssistantComplete { usage: None },
1640        )
1641        .await
1642        .unwrap();
1643
1644        // Render from user2 to end (should skip first exchange)
1645        let messages = storage
1646            .render_messages(&tree_id, &user2, &complete2)
1647            .await
1648            .unwrap();
1649
1650        // Verify: 2 messages (only second exchange)
1651        assert_eq!(messages.len(), 2);
1652        assert_eq!(messages[0].role, "user");
1653        if let ContentBlock::Text { text } = &messages[0].content[0] {
1654            assert_eq!(text, "Second message");
1655        } else {
1656            panic!("Expected text content block");
1657        }
1658    }
1659}