1use super::types::{
2 BufferedEvent, ChatEvent, ClaudeCodeConfig, ClaudeCodeError, ClaudeCodeHandle, ClaudeCodeId,
3 ClaudeCodeInfo, ClaudeMessage, ContentBlock, Message, MessageId, MessageRole, Model,
4 NodeEvent, Position, StreamId, StreamInfo, StreamStatus,
5};
6use crate::activations::arbor::{ArborStorage, NodeId, NodeType, TreeId};
7use serde_json::Value;
8use sqlx::{sqlite::{SqliteConnectOptions, SqlitePool}, ConnectOptions, Row};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16#[derive(Debug, Clone)]
18pub struct ClaudeCodeStorageConfig {
19 pub db_path: PathBuf,
21}
22
23impl Default for ClaudeCodeStorageConfig {
24 fn default() -> Self {
25 Self {
26 db_path: PathBuf::from("claudecode.db"),
27 }
28 }
29}
30
31#[derive(Debug)]
33struct ActiveStreamBuffer {
34 info: StreamInfo,
36 events: Vec<BufferedEvent>,
38}
39
40pub struct ClaudeCodeStorage {
42 pool: SqlitePool,
43 arbor: Arc<ArborStorage>,
44 streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>>,
46}
47
48impl ClaudeCodeStorage {
49 pub async fn new(
51 config: ClaudeCodeStorageConfig,
52 arbor: Arc<ArborStorage>,
53 ) -> Result<Self, ClaudeCodeError> {
54 let db_url = format!("sqlite:{}?mode=rwc", config.db_path.display());
55 let connect_options: SqliteConnectOptions = db_url.parse()
56 .map_err(|e| format!("Failed to parse database URL: {}", e))?;
57 let connect_options = connect_options.disable_statement_logging();
58 let pool = SqlitePool::connect_with(connect_options.clone())
59 .await
60 .map_err(|e| format!("Failed to connect to claudecode database: {}", e))?;
61
62 let storage = Self {
63 pool,
64 arbor,
65 streams: RwLock::new(HashMap::new()),
66 };
67 storage.run_migrations().await?;
68
69 Ok(storage)
70 }
71
72 async fn run_migrations(&self) -> Result<(), ClaudeCodeError> {
74 sqlx::query(
75 r#"
76 CREATE TABLE IF NOT EXISTS claudecode_sessions (
77 id TEXT PRIMARY KEY,
78 name TEXT NOT NULL UNIQUE,
79 claude_session_id TEXT,
80 tree_id TEXT NOT NULL,
81 canonical_head TEXT NOT NULL,
82 working_dir TEXT NOT NULL,
83 model TEXT NOT NULL,
84 system_prompt TEXT,
85 mcp_config TEXT,
86 loopback_enabled INTEGER NOT NULL DEFAULT 0,
87 metadata TEXT,
88 created_at INTEGER NOT NULL,
89 updated_at INTEGER NOT NULL
90 );
91
92 CREATE TABLE IF NOT EXISTS claudecode_messages (
93 id TEXT PRIMARY KEY,
94 session_id TEXT NOT NULL,
95 role TEXT NOT NULL,
96 content TEXT NOT NULL,
97 model_id TEXT,
98 input_tokens INTEGER,
99 output_tokens INTEGER,
100 cost_usd REAL,
101 created_at INTEGER NOT NULL,
102 FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
103 );
104
105 CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_name ON claudecode_sessions(name);
106 CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_tree ON claudecode_sessions(tree_id);
107 CREATE INDEX IF NOT EXISTS idx_claudecode_messages_session ON claudecode_messages(session_id);
108
109 CREATE TABLE IF NOT EXISTS claudecode_unknown_events (
110 id TEXT PRIMARY KEY,
111 session_id TEXT,
112 event_type TEXT NOT NULL,
113 data TEXT NOT NULL,
114 created_at INTEGER NOT NULL,
115 FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
116 );
117
118 CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_session ON claudecode_unknown_events(session_id);
119 CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_type ON claudecode_unknown_events(event_type);
120 "#,
121 )
122 .execute(&self.pool)
123 .await
124 .map_err(|e| format!("Failed to run claudecode migrations: {}", e))?;
125
126 Ok(())
127 }
128
129 pub fn arbor(&self) -> &ArborStorage {
131 &self.arbor
132 }
133
134 pub async fn session_create(
140 &self,
141 name: String,
142 working_dir: String,
143 model: Model,
144 system_prompt: Option<String>,
145 mcp_config: Option<Value>,
146 loopback_enabled: bool,
147 metadata: Option<Value>,
148 ) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
149 let session_id = ClaudeCodeId::new_v4();
150 let now = current_timestamp();
151
152 let tree_id = self
154 .arbor
155 .tree_create(metadata.clone(), &session_id.to_string())
156 .await
157 .map_err(|e| format!("Failed to create tree for session: {}", e))?;
158
159 let tree = self
161 .arbor
162 .tree_get(&tree_id)
163 .await
164 .map_err(|e| format!("Failed to get tree: {}", e))?;
165 let head = Position::new(tree_id, tree.root);
166
167 let metadata_json = metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
168 let mcp_config_json = mcp_config.as_ref().map(|m| serde_json::to_string(m).unwrap());
169
170 let final_name = match sqlx::query(
172 "INSERT INTO claudecode_sessions (id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
173 VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
174 )
175 .bind(session_id.to_string())
176 .bind(&name)
177 .bind(head.tree_id.to_string())
178 .bind(head.node_id.to_string())
179 .bind(&working_dir)
180 .bind(model.as_str())
181 .bind(&system_prompt)
182 .bind(mcp_config_json.clone())
183 .bind(loopback_enabled)
184 .bind(metadata_json.clone())
185 .bind(now)
186 .bind(now)
187 .execute(&self.pool)
188 .await
189 {
190 Ok(_) => name,
191 Err(e) if e.to_string().contains("UNIQUE constraint failed") => {
192 let unique_name = format!("{}#{}", name, session_id);
194
195 sqlx::query(
196 "INSERT INTO claudecode_sessions (id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
197 VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
198 )
199 .bind(session_id.to_string())
200 .bind(&unique_name)
201 .bind(head.tree_id.to_string())
202 .bind(head.node_id.to_string())
203 .bind(&working_dir)
204 .bind(model.as_str())
205 .bind(&system_prompt)
206 .bind(mcp_config_json)
207 .bind(loopback_enabled)
208 .bind(metadata_json)
209 .bind(now)
210 .bind(now)
211 .execute(&self.pool)
212 .await
213 .map_err(|e| format!("Failed to create session with unique name: {}", e))?;
214
215 unique_name
216 }
217 Err(e) => return Err(ClaudeCodeError::from(format!("Failed to create session: {}", e))),
218 };
219
220 Ok(ClaudeCodeConfig {
221 id: session_id,
222 name: final_name,
223 claude_session_id: None,
224 head,
225 working_dir,
226 model,
227 system_prompt,
228 mcp_config,
229 loopback_enabled,
230 metadata,
231 created_at: now,
232 updated_at: now,
233 })
234 }
235
236 pub async fn session_get(&self, session_id: &ClaudeCodeId) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
238 let row = sqlx::query(
239 "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
240 FROM claudecode_sessions WHERE id = ?",
241 )
242 .bind(session_id.to_string())
243 .fetch_optional(&self.pool)
244 .await
245 .map_err(|e| format!("Failed to fetch session: {}", e))?
246 .ok_or_else(|| format!("Session not found: {}", session_id))?;
247
248 self.row_to_config(row)
249 }
250
251 pub async fn session_get_by_name(&self, name: &str) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
253 if let Some(row) = sqlx::query(
255 "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
256 FROM claudecode_sessions WHERE name = ?",
257 )
258 .bind(name)
259 .fetch_optional(&self.pool)
260 .await
261 .map_err(|e| format!("Failed to fetch session by name: {}", e))?
262 {
263 return self.row_to_config(row);
264 }
265
266 let pattern = format!("{}%", name);
268 let rows = sqlx::query(
269 "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
270 FROM claudecode_sessions WHERE name LIKE ?",
271 )
272 .bind(&pattern)
273 .fetch_all(&self.pool)
274 .await
275 .map_err(|e| format!("Failed to fetch session by pattern: {}", e))?;
276
277 match rows.len() {
278 0 => Err(ClaudeCodeError::from(format!("Session not found with name: {}", name))),
279 1 => self.row_to_config(rows.into_iter().next().unwrap()),
280 _ => {
281 let matches: Vec<String> = rows.iter().map(|r| r.get("name")).collect();
282 Err(ClaudeCodeError::from(format!(
283 "Ambiguous name '{}' matches multiple sessions: {}",
284 name,
285 matches.join(", ")
286 )))
287 }
288 }
289 }
290
291 pub async fn session_list(&self) -> Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> {
293 let rows = sqlx::query(
294 "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, loopback_enabled, created_at
295 FROM claudecode_sessions ORDER BY created_at DESC",
296 )
297 .fetch_all(&self.pool)
298 .await
299 .map_err(|e| format!("Failed to list sessions: {}", e))?;
300
301 let sessions: Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> = rows
302 .iter()
303 .map(|row| {
304 let id_str: String = row.get("id");
305 let tree_id_str: String = row.get("tree_id");
306 let head_str: String = row.get("canonical_head");
307 let model_str: String = row.get("model");
308 let loopback: i32 = row.get("loopback_enabled");
309
310 let tree_id = TreeId::parse_str(&tree_id_str)
311 .map_err(|e| format!("Invalid tree ID: {}", e))?;
312 let node_id = NodeId::parse_str(&head_str)
313 .map_err(|e| format!("Invalid node ID: {}", e))?;
314 let model = Model::from_str(&model_str)
315 .ok_or_else(|| format!("Invalid model: {}", model_str))?;
316
317 Ok(ClaudeCodeInfo {
318 id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid session ID: {}", e))?,
319 name: row.get("name"),
320 model,
321 head: Position::new(tree_id, node_id),
322 claude_session_id: row.get("claude_session_id"),
323 working_dir: row.get("working_dir"),
324 loopback_enabled: loopback != 0,
325 created_at: row.get("created_at"),
326 })
327 })
328 .collect();
329
330 sessions
331 }
332
333 pub async fn session_update_head(
335 &self,
336 session_id: &ClaudeCodeId,
337 new_head: NodeId,
338 claude_session_id: Option<String>,
339 ) -> Result<(), ClaudeCodeError> {
340 let now = current_timestamp();
341
342 let result = if let Some(claude_id) = claude_session_id {
343 sqlx::query(
344 "UPDATE claudecode_sessions SET canonical_head = ?, claude_session_id = ?, updated_at = ? WHERE id = ?",
345 )
346 .bind(new_head.to_string())
347 .bind(claude_id)
348 .bind(now)
349 .bind(session_id.to_string())
350 .execute(&self.pool)
351 .await
352 } else {
353 sqlx::query(
354 "UPDATE claudecode_sessions SET canonical_head = ?, updated_at = ? WHERE id = ?",
355 )
356 .bind(new_head.to_string())
357 .bind(now)
358 .bind(session_id.to_string())
359 .execute(&self.pool)
360 .await
361 }
362 .map_err(|e| format!("Failed to update session head: {}", e))?;
363
364 if result.rows_affected() == 0 {
365 return Err(format!("Session not found: {}", session_id).into());
366 }
367
368 Ok(())
369 }
370
371 pub async fn session_update(
373 &self,
374 session_id: &ClaudeCodeId,
375 name: Option<String>,
376 model: Option<Model>,
377 system_prompt: Option<Option<String>>,
378 mcp_config: Option<Value>,
379 metadata: Option<Value>,
380 ) -> Result<(), ClaudeCodeError> {
381 let now = current_timestamp();
382 let current = self.session_get(session_id).await?;
383
384 let new_name = name.unwrap_or(current.name);
385 let new_model = model.unwrap_or(current.model);
386 let new_prompt = system_prompt.unwrap_or(current.system_prompt);
387 let new_mcp = mcp_config.or(current.mcp_config);
388 let new_metadata = metadata.or(current.metadata);
389
390 let mcp_json = new_mcp.as_ref().map(|m| serde_json::to_string(m).unwrap());
391 let metadata_json = new_metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
392
393 sqlx::query(
394 "UPDATE claudecode_sessions SET name = ?, model = ?, system_prompt = ?, mcp_config = ?, metadata = ?, updated_at = ? WHERE id = ?",
395 )
396 .bind(&new_name)
397 .bind(new_model.as_str())
398 .bind(&new_prompt)
399 .bind(mcp_json)
400 .bind(metadata_json)
401 .bind(now)
402 .bind(session_id.to_string())
403 .execute(&self.pool)
404 .await
405 .map_err(|e| format!("Failed to update session: {}", e))?;
406
407 Ok(())
408 }
409
410 pub async fn session_delete(&self, session_id: &ClaudeCodeId) -> Result<(), ClaudeCodeError> {
412 let result = sqlx::query("DELETE FROM claudecode_sessions WHERE id = ?")
413 .bind(session_id.to_string())
414 .execute(&self.pool)
415 .await
416 .map_err(|e| format!("Failed to delete session: {}", e))?;
417
418 if result.rows_affected() == 0 {
419 return Err(format!("Session not found: {}", session_id).into());
420 }
421
422 Ok(())
423 }
424
425 pub async fn message_create(
431 &self,
432 session_id: &ClaudeCodeId,
433 role: MessageRole,
434 content: String,
435 model_id: Option<String>,
436 input_tokens: Option<i64>,
437 output_tokens: Option<i64>,
438 cost_usd: Option<f64>,
439 ) -> Result<Message, ClaudeCodeError> {
440 let message_id = MessageId::new_v4();
441 let now = current_timestamp();
442
443 sqlx::query(
444 "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
445 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
446 )
447 .bind(message_id.to_string())
448 .bind(session_id.to_string())
449 .bind(role.as_str())
450 .bind(&content)
451 .bind(&model_id)
452 .bind(input_tokens)
453 .bind(output_tokens)
454 .bind(cost_usd)
455 .bind(now)
456 .execute(&self.pool)
457 .await
458 .map_err(|e| format!("Failed to create message: {}", e))?;
459
460 Ok(Message {
461 id: message_id,
462 session_id: *session_id,
463 role,
464 content,
465 created_at: now,
466 model_id,
467 input_tokens,
468 output_tokens,
469 cost_usd,
470 })
471 }
472
473 pub async fn message_create_ephemeral(
475 &self,
476 session_id: &ClaudeCodeId,
477 role: MessageRole,
478 content: String,
479 model_id: Option<String>,
480 input_tokens: Option<i64>,
481 output_tokens: Option<i64>,
482 cost_usd: Option<f64>,
483 ) -> Result<Message, ClaudeCodeError> {
484 let message_id = MessageId::new_v4();
485 let now = current_timestamp();
486
487 let ephemeral_marker = -now;
491
492 sqlx::query(
493 "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
494 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
495 )
496 .bind(message_id.to_string())
497 .bind(session_id.to_string())
498 .bind(role.as_str())
499 .bind(&content)
500 .bind(&model_id)
501 .bind(input_tokens)
502 .bind(output_tokens)
503 .bind(cost_usd)
504 .bind(ephemeral_marker)
505 .execute(&self.pool)
506 .await
507 .map_err(|e| format!("Failed to create ephemeral message: {}", e))?;
508
509 Ok(Message {
510 id: message_id,
511 session_id: *session_id,
512 role,
513 content,
514 created_at: ephemeral_marker,
515 model_id,
516 input_tokens,
517 output_tokens,
518 cost_usd,
519 })
520 }
521
522 pub async fn message_get(&self, message_id: &MessageId) -> Result<Message, ClaudeCodeError> {
524 let row = sqlx::query(
525 "SELECT id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at
526 FROM claudecode_messages WHERE id = ?",
527 )
528 .bind(message_id.to_string())
529 .fetch_optional(&self.pool)
530 .await
531 .map_err(|e| format!("Failed to fetch message: {}", e))?
532 .ok_or_else(|| format!("Message not found: {}", message_id))?;
533
534 self.row_to_message(row)
535 }
536
537 pub async fn resolve_message_handle(&self, identifier: &str) -> Result<Message, ClaudeCodeError> {
540 let parts: Vec<&str> = identifier.splitn(3, ':').collect();
541 if parts.len() < 2 {
542 return Err(format!("Invalid message handle format: {}", identifier).into());
543 }
544
545 let msg_part = parts[0];
546 if !msg_part.starts_with("msg-") {
547 return Err(format!("Invalid message handle format: {}", identifier).into());
548 }
549
550 let message_id_str = &msg_part[4..];
551 let message_id = Uuid::parse_str(message_id_str)
552 .map_err(|e| format!("Invalid message ID in handle: {}", e))?;
553
554 self.message_get(&message_id).await
555 }
556
557 pub fn message_to_handle(message: &Message, name: &str) -> crate::types::Handle {
562 ClaudeCodeHandle::Message {
563 message_id: format!("msg-{}", message.id),
564 role: message.role.as_str().to_string(),
565 name: name.to_string(),
566 }.to_handle()
567 }
568
569 pub async fn unknown_event_store(
575 &self,
576 session_id: Option<&ClaudeCodeId>,
577 event_type: &str,
578 data: &Value,
579 ) -> Result<String, ClaudeCodeError> {
580 let id = Uuid::new_v4().to_string();
581 let now = current_timestamp();
582 let data_json = serde_json::to_string(data)
583 .map_err(|e| format!("Failed to serialize unknown event data: {}", e))?;
584
585 sqlx::query(
586 "INSERT INTO claudecode_unknown_events (id, session_id, event_type, data, created_at)
587 VALUES (?, ?, ?, ?, ?)",
588 )
589 .bind(&id)
590 .bind(session_id.map(|s| s.to_string()))
591 .bind(event_type)
592 .bind(&data_json)
593 .bind(now)
594 .execute(&self.pool)
595 .await
596 .map_err(|e| format!("Failed to store unknown event: {}", e))?;
597
598 Ok(id)
599 }
600
601 pub async fn unknown_event_get(&self, id: &str) -> Result<(String, Value), ClaudeCodeError> {
603 let row = sqlx::query(
604 "SELECT event_type, data FROM claudecode_unknown_events WHERE id = ?",
605 )
606 .bind(id)
607 .fetch_optional(&self.pool)
608 .await
609 .map_err(|e| format!("Failed to fetch unknown event: {}", e))?
610 .ok_or_else(|| format!("Unknown event not found: {}", id))?;
611
612 let event_type: String = row.get("event_type");
613 let data_json: String = row.get("data");
614 let data: Value = serde_json::from_str(&data_json)
615 .map_err(|e| format!("Failed to parse unknown event data: {}", e))?;
616
617 Ok((event_type, data))
618 }
619
620 pub async fn unknown_events_by_type(&self, event_type: &str) -> Result<Vec<(String, Value)>, ClaudeCodeError> {
622 let rows = sqlx::query(
623 "SELECT id, data FROM claudecode_unknown_events WHERE event_type = ? ORDER BY created_at DESC",
624 )
625 .bind(event_type)
626 .fetch_all(&self.pool)
627 .await
628 .map_err(|e| format!("Failed to list unknown events: {}", e))?;
629
630 rows.iter()
631 .map(|row| {
632 let id: String = row.get("id");
633 let data_json: String = row.get("data");
634 let data: Value = serde_json::from_str(&data_json)
635 .map_err(|e| format!("Failed to parse unknown event data: {}", e))?;
636 Ok((id, data))
637 })
638 .collect()
639 }
640
641 pub async fn stream_create(
647 &self,
648 session_id: ClaudeCodeId,
649 ) -> Result<StreamId, ClaudeCodeError> {
650 let stream_id = StreamId::new_v4();
651 let now = current_timestamp();
652
653 let info = StreamInfo {
654 stream_id,
655 session_id,
656 status: StreamStatus::Running,
657 user_position: None,
658 event_count: 0,
659 read_position: 0,
660 started_at: now,
661 ended_at: None,
662 error: None,
663 };
664
665 let buffer = ActiveStreamBuffer {
666 info,
667 events: Vec::new(),
668 };
669
670 let mut streams = self.streams.write().await;
671 streams.insert(stream_id, buffer);
672
673 Ok(stream_id)
674 }
675
676 pub async fn stream_set_user_position(
678 &self,
679 stream_id: &StreamId,
680 position: Position,
681 ) -> Result<(), ClaudeCodeError> {
682 let mut streams = self.streams.write().await;
683 let buffer = streams.get_mut(stream_id)
684 .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
685 buffer.info.user_position = Some(position);
686 Ok(())
687 }
688
689 pub async fn stream_push_event(
691 &self,
692 stream_id: &StreamId,
693 event: ChatEvent,
694 ) -> Result<u64, ClaudeCodeError> {
695 let now = current_timestamp();
696 let mut streams = self.streams.write().await;
697 let buffer = streams.get_mut(stream_id)
698 .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
699
700 let seq = buffer.info.event_count;
701 buffer.events.push(BufferedEvent {
702 seq,
703 event,
704 timestamp: now,
705 });
706 buffer.info.event_count += 1;
707
708 Ok(seq)
709 }
710
711 pub async fn stream_set_status(
713 &self,
714 stream_id: &StreamId,
715 status: StreamStatus,
716 error: Option<String>,
717 ) -> Result<(), ClaudeCodeError> {
718 let now = current_timestamp();
719 let mut streams = self.streams.write().await;
720 let buffer = streams.get_mut(stream_id)
721 .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
722
723 buffer.info.status = status;
724 if status == StreamStatus::Complete || status == StreamStatus::Failed {
725 buffer.info.ended_at = Some(now);
726 }
727 if let Some(e) = error {
728 buffer.info.error = Some(e);
729 }
730
731 Ok(())
732 }
733
734 pub async fn stream_get_info(&self, stream_id: &StreamId) -> Result<StreamInfo, ClaudeCodeError> {
736 let streams = self.streams.read().await;
737 let buffer = streams.get(stream_id)
738 .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
739 Ok(buffer.info.clone())
740 }
741
742 pub async fn stream_poll(
745 &self,
746 stream_id: &StreamId,
747 from_seq: Option<u64>,
748 limit: Option<usize>,
749 ) -> Result<(StreamInfo, Vec<BufferedEvent>), ClaudeCodeError> {
750 let mut streams = self.streams.write().await;
751 let buffer = streams.get_mut(stream_id)
752 .ok_or_else(|| format!("Stream not found: {}", stream_id))?;
753
754 let start = from_seq.unwrap_or(buffer.info.read_position) as usize;
755 let max_events = limit.unwrap_or(100);
756
757 let events: Vec<BufferedEvent> = buffer.events
758 .iter()
759 .skip(start)
760 .take(max_events)
761 .cloned()
762 .collect();
763
764 if !events.is_empty() {
766 let last_seq = events.last().unwrap().seq;
767 buffer.info.read_position = last_seq + 1;
768 }
769
770 Ok((buffer.info.clone(), events))
771 }
772
773 pub async fn stream_list(&self) -> Vec<StreamInfo> {
775 let streams = self.streams.read().await;
776 streams.values().map(|b| b.info.clone()).collect()
777 }
778
779 pub async fn stream_list_for_session(&self, session_id: &ClaudeCodeId) -> Vec<StreamInfo> {
781 let streams = self.streams.read().await;
782 streams
783 .values()
784 .filter(|b| &b.info.session_id == session_id)
785 .map(|b| b.info.clone())
786 .collect()
787 }
788
789 pub async fn stream_cleanup(&self, stream_id: &StreamId) -> Option<StreamInfo> {
792 let mut streams = self.streams.write().await;
793 streams.remove(stream_id).map(|b| b.info)
794 }
795
796 pub async fn stream_exists(&self, stream_id: &StreamId) -> bool {
798 let streams = self.streams.read().await;
799 streams.contains_key(stream_id)
800 }
801
802 pub async fn render_messages(
817 &self,
818 tree_id: &TreeId,
819 start: &NodeId,
820 end: &NodeId,
821 ) -> Result<Vec<ClaudeMessage>, ClaudeCodeError> {
822 let node_ids = self
824 .arbor
825 .node_get_path(tree_id, end)
826 .await
827 .map_err(|e| format!("Failed to get node path: {}", e))?;
828
829 let start_idx = node_ids
831 .iter()
832 .position(|id| id == start)
833 .ok_or_else(|| format!("Start node not found in path from root to end"))?;
834
835 let node_ids = &node_ids[start_idx..];
836
837 let mut messages: Vec<ClaudeMessage> = Vec::new();
839 let mut current_role: Option<String> = None;
840 let mut current_content: Vec<ContentBlock> = Vec::new();
841
842 for node_id in node_ids {
843 let node = self
845 .arbor
846 .node_get(tree_id, node_id)
847 .await
848 .map_err(|e| format!("Failed to get node: {}", e))?;
849
850 let content = match &node.data {
852 NodeType::Text { content } => content,
853 NodeType::External { .. } => {
854 continue;
856 }
857 };
858
859 if content.trim().is_empty() {
861 continue;
862 }
863
864 let event: NodeEvent = serde_json::from_str(content)
866 .map_err(|e| format!("Failed to parse node content as NodeEvent: {}", e))?;
867
868 match event {
869 NodeEvent::UserMessage { content } => {
870 if let Some(role) = current_role.take() {
872 if !current_content.is_empty() {
873 messages.push(ClaudeMessage {
874 role,
875 content: current_content.clone(),
876 });
877 current_content.clear();
878 }
879 }
880
881 current_role = Some("user".to_string());
883 current_content.push(ContentBlock::Text { text: content });
884 }
885
886 NodeEvent::AssistantStart => {
887 if let Some(role) = current_role.take() {
889 if !current_content.is_empty() {
890 messages.push(ClaudeMessage {
891 role,
892 content: current_content.clone(),
893 });
894 current_content.clear();
895 }
896 }
897
898 current_role = Some("assistant".to_string());
900 }
901
902 NodeEvent::ContentText { text } => {
903 current_content.push(ContentBlock::Text { text });
905 }
906
907 NodeEvent::ContentToolUse { id, name, input } => {
908 current_content.push(ContentBlock::ToolUse { id, name, input });
910 }
911
912 NodeEvent::ContentThinking { thinking } => {
913 current_content.push(ContentBlock::Thinking { thinking });
915 }
916
917 NodeEvent::UserToolResult {
918 tool_use_id,
919 content,
920 is_error,
921 } => {
922 if let Some(role) = current_role.take() {
924 if !current_content.is_empty() {
925 messages.push(ClaudeMessage {
926 role,
927 content: current_content.clone(),
928 });
929 current_content.clear();
930 }
931 }
932
933 messages.push(ClaudeMessage {
935 role: "user".to_string(),
936 content: vec![ContentBlock::ToolResult {
937 tool_use_id,
938 content,
939 is_error,
940 }],
941 });
942 }
943
944 NodeEvent::AssistantComplete { usage: _ } => {
945 if let Some(role) = current_role.take() {
947 if !current_content.is_empty() {
948 messages.push(ClaudeMessage {
949 role,
950 content: current_content.clone(),
951 });
952 current_content.clear();
953 }
954 }
955 }
956 }
957 }
958
959 if let Some(role) = current_role {
961 if !current_content.is_empty() {
962 messages.push(ClaudeMessage {
963 role,
964 content: current_content,
965 });
966 }
967 }
968
969 Ok(messages)
970 }
971
972 fn row_to_message(&self, row: sqlx::sqlite::SqliteRow) -> Result<Message, ClaudeCodeError> {
977 let id_str: String = row.get("id");
978 let session_id_str: String = row.get("session_id");
979 let role_str: String = row.get("role");
980
981 Ok(Message {
982 id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid message ID: {}", e))?,
983 session_id: Uuid::parse_str(&session_id_str)
984 .map_err(|e| format!("Invalid session ID: {}", e))?,
985 role: MessageRole::from_str(&role_str)
986 .ok_or_else(|| format!("Invalid role: {}", role_str))?,
987 content: row.get("content"),
988 created_at: row.get("created_at"),
989 model_id: row.get("model_id"),
990 input_tokens: row.get("input_tokens"),
991 output_tokens: row.get("output_tokens"),
992 cost_usd: row.get("cost_usd"),
993 })
994 }
995
996 fn row_to_config(&self, row: sqlx::sqlite::SqliteRow) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
997 let id_str: String = row.get("id");
998 let tree_id_str: String = row.get("tree_id");
999 let head_str: String = row.get("canonical_head");
1000 let model_str: String = row.get("model");
1001 let metadata_json: Option<String> = row.get("metadata");
1002 let mcp_config_json: Option<String> = row.get("mcp_config");
1003 let loopback: i32 = row.get("loopback_enabled");
1004
1005 let tree_id = TreeId::parse_str(&tree_id_str)
1006 .map_err(|e| format!("Invalid tree ID: {}", e))?;
1007 let node_id = NodeId::parse_str(&head_str)
1008 .map_err(|e| format!("Invalid node ID: {}", e))?;
1009 let model = Model::from_str(&model_str)
1010 .ok_or_else(|| format!("Invalid model: {}", model_str))?;
1011
1012 Ok(ClaudeCodeConfig {
1013 id: Uuid::parse_str(&id_str).map_err(|e| format!("Invalid session ID: {}", e))?,
1014 name: row.get("name"),
1015 claude_session_id: row.get("claude_session_id"),
1016 head: Position::new(tree_id, node_id),
1017 working_dir: row.get("working_dir"),
1018 model,
1019 system_prompt: row.get("system_prompt"),
1020 mcp_config: mcp_config_json.and_then(|s| serde_json::from_str(&s).ok()),
1021 loopback_enabled: loopback != 0,
1022 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
1023 created_at: row.get("created_at"),
1024 updated_at: row.get("updated_at"),
1025 })
1026 }
1027}
1028
1029fn current_timestamp() -> i64 {
1031 SystemTime::now()
1032 .duration_since(UNIX_EPOCH)
1033 .unwrap()
1034 .as_secs() as i64
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040
1041 #[tokio::test]
1043 async fn test_stream_buffer_operations() {
1044 let streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>> = RwLock::new(HashMap::new());
1046
1047 let stream_id = StreamId::new_v4();
1049 let session_id = ClaudeCodeId::new_v4();
1050 let now = current_timestamp();
1051
1052 let info = StreamInfo {
1053 stream_id,
1054 session_id,
1055 status: StreamStatus::Running,
1056 user_position: None,
1057 event_count: 0,
1058 read_position: 0,
1059 started_at: now,
1060 ended_at: None,
1061 error: None,
1062 };
1063
1064 let buffer = ActiveStreamBuffer {
1065 info,
1066 events: Vec::new(),
1067 };
1068
1069 streams.write().await.insert(stream_id, buffer);
1070
1071 {
1073 let mut streams = streams.write().await;
1074 let buffer = streams.get_mut(&stream_id).unwrap();
1075
1076 buffer.events.push(BufferedEvent {
1077 seq: 0,
1078 event: ChatEvent::Start {
1079 id: session_id,
1080 user_position: Position::new(TreeId::new(), NodeId::new()),
1081 },
1082 timestamp: now,
1083 });
1084 buffer.info.event_count = 1;
1085
1086 buffer.events.push(BufferedEvent {
1087 seq: 1,
1088 event: ChatEvent::Content { text: "Hello".to_string() },
1089 timestamp: now,
1090 });
1091 buffer.info.event_count = 2;
1092 }
1093
1094 {
1096 let mut streams = streams.write().await;
1097 let buffer = streams.get_mut(&stream_id).unwrap();
1098
1099 let events: Vec<_> = buffer.events.iter().skip(0).take(10).cloned().collect();
1100 assert_eq!(events.len(), 2);
1101 assert_eq!(events[0].seq, 0);
1102 assert_eq!(events[1].seq, 1);
1103
1104 buffer.info.read_position = 2;
1106 }
1107
1108 {
1110 let streams = streams.read().await;
1111 let buffer = streams.get(&stream_id).unwrap();
1112
1113 let events: Vec<_> = buffer.events.iter()
1114 .skip(buffer.info.read_position as usize)
1115 .take(10)
1116 .collect();
1117 assert_eq!(events.len(), 0);
1118 }
1119
1120 {
1122 let mut streams = streams.write().await;
1123 let buffer = streams.get_mut(&stream_id).unwrap();
1124
1125 buffer.events.push(BufferedEvent {
1126 seq: 2,
1127 event: ChatEvent::Content { text: " World".to_string() },
1128 timestamp: now,
1129 });
1130 buffer.info.event_count = 3;
1131 }
1132
1133 {
1135 let mut streams = streams.write().await;
1136 let buffer = streams.get_mut(&stream_id).unwrap();
1137
1138 let events: Vec<_> = buffer.events.iter()
1139 .skip(buffer.info.read_position as usize)
1140 .take(10)
1141 .cloned()
1142 .collect();
1143 assert_eq!(events.len(), 1);
1144 assert_eq!(events[0].seq, 2);
1145
1146 buffer.info.read_position = 3;
1148 }
1149
1150 {
1152 let mut streams = streams.write().await;
1153 let buffer = streams.get_mut(&stream_id).unwrap();
1154
1155 assert_eq!(buffer.info.status, StreamStatus::Running);
1156
1157 buffer.info.status = StreamStatus::AwaitingPermission;
1158 assert_eq!(buffer.info.status, StreamStatus::AwaitingPermission);
1159
1160 buffer.info.status = StreamStatus::Complete;
1161 buffer.info.ended_at = Some(current_timestamp());
1162 assert_eq!(buffer.info.status, StreamStatus::Complete);
1163 assert!(buffer.info.ended_at.is_some());
1164 }
1165 }
1166
1167 #[test]
1168 fn test_stream_status_serialization() {
1169 let status = StreamStatus::AwaitingPermission;
1171 let json = serde_json::to_string(&status).unwrap();
1172 assert_eq!(json, "\"awaiting_permission\"");
1173
1174 let status = StreamStatus::Running;
1175 let json = serde_json::to_string(&status).unwrap();
1176 assert_eq!(json, "\"running\"");
1177 }
1178}
1179
1180#[cfg(test)]
1185mod render_tests {
1186 use super::*;
1187 use crate::activations::arbor::{ArborConfig, ArborError};
1188
1189 async fn create_test_storage() -> (ClaudeCodeStorage, PathBuf) {
1191 let temp_dir = std::env::temp_dir();
1192 let test_id = Uuid::new_v4();
1193 let arbor_path = temp_dir.join(format!("test_arbor_{}.db", test_id));
1194 let claudecode_path = temp_dir.join(format!("test_claudecode_{}.db", test_id));
1195
1196 let arbor_config = ArborConfig {
1197 db_path: arbor_path.clone(),
1198 scheduled_deletion_window: 604800,
1199 archive_window: 2592000,
1200 auto_cleanup: false, cleanup_interval: 3600,
1202 };
1203 let arbor = Arc::new(ArborStorage::new(arbor_config).await.unwrap());
1204
1205 let claudecode_config = ClaudeCodeStorageConfig {
1206 db_path: claudecode_path.clone(),
1207 };
1208 let storage = ClaudeCodeStorage::new(claudecode_config, arbor)
1209 .await
1210 .unwrap();
1211
1212 (storage, arbor_path)
1213 }
1214
1215 async fn create_event_node(
1217 arbor: &ArborStorage,
1218 tree_id: &TreeId,
1219 parent_id: &NodeId,
1220 event: &NodeEvent,
1221 ) -> Result<NodeId, ArborError> {
1222 let content = serde_json::to_string(event).map_err(|e| e.to_string())?;
1223 arbor.node_create_text(tree_id, Some(*parent_id), content, None).await
1224 }
1225
1226 #[tokio::test]
1227 async fn test_render_simple_exchange() {
1228 let (storage, _temp_path) = create_test_storage().await;
1229 let arbor = storage.arbor();
1230
1231 let tree_id = arbor.tree_create(None, "test-session").await.unwrap();
1233 let tree = arbor.tree_get(&tree_id).await.unwrap();
1234 let root = tree.root;
1235
1236 let user_node = create_event_node(
1238 arbor,
1239 &tree_id,
1240 &root,
1241 &NodeEvent::UserMessage {
1242 content: "Hello".to_string(),
1243 },
1244 )
1245 .await
1246 .unwrap();
1247
1248 let assistant_start = create_event_node(
1249 arbor,
1250 &tree_id,
1251 &user_node,
1252 &NodeEvent::AssistantStart,
1253 )
1254 .await
1255 .unwrap();
1256
1257 let content_node = create_event_node(
1258 arbor,
1259 &tree_id,
1260 &assistant_start,
1261 &NodeEvent::ContentText {
1262 text: "Hi there!".to_string(),
1263 },
1264 )
1265 .await
1266 .unwrap();
1267
1268 let complete_node = create_event_node(
1269 arbor,
1270 &tree_id,
1271 &content_node,
1272 &NodeEvent::AssistantComplete { usage: None },
1273 )
1274 .await
1275 .unwrap();
1276
1277 let messages = storage
1279 .render_messages(&tree_id, &root, &complete_node)
1280 .await
1281 .unwrap();
1282
1283 assert_eq!(messages.len(), 2);
1285
1286 assert_eq!(messages[0].role, "user");
1288 assert_eq!(messages[0].content.len(), 1);
1289 if let ContentBlock::Text { text } = &messages[0].content[0] {
1290 assert_eq!(text, "Hello");
1291 } else {
1292 panic!("Expected text content block");
1293 }
1294
1295 assert_eq!(messages[1].role, "assistant");
1297 assert_eq!(messages[1].content.len(), 1);
1298 if let ContentBlock::Text { text } = &messages[1].content[0] {
1299 assert_eq!(text, "Hi there!");
1300 } else {
1301 panic!("Expected text content block");
1302 }
1303 }
1304
1305 #[tokio::test]
1306 async fn test_render_with_tool_use() {
1307 let (storage, _temp_path) = create_test_storage().await;
1308 let arbor = storage.arbor();
1309
1310 let tree_id = arbor.tree_create(None, "test-tool-session").await.unwrap();
1312 let tree = arbor.tree_get(&tree_id).await.unwrap();
1313 let root = tree.root;
1314
1315 let user_node = create_event_node(
1319 arbor,
1320 &tree_id,
1321 &root,
1322 &NodeEvent::UserMessage {
1323 content: "Write a file".to_string(),
1324 },
1325 )
1326 .await
1327 .unwrap();
1328
1329 let assistant_start1 = create_event_node(
1330 arbor,
1331 &tree_id,
1332 &user_node,
1333 &NodeEvent::AssistantStart,
1334 )
1335 .await
1336 .unwrap();
1337
1338 let content1 = create_event_node(
1339 arbor,
1340 &tree_id,
1341 &assistant_start1,
1342 &NodeEvent::ContentText {
1343 text: "I'll write the file".to_string(),
1344 },
1345 )
1346 .await
1347 .unwrap();
1348
1349 let tool_use = create_event_node(
1350 arbor,
1351 &tree_id,
1352 &content1,
1353 &NodeEvent::ContentToolUse {
1354 id: "toolu_123".to_string(),
1355 name: "Write".to_string(),
1356 input: serde_json::json!({"file_path": "/tmp/test.txt", "content": "hello"}),
1357 },
1358 )
1359 .await
1360 .unwrap();
1361
1362 let complete1 = create_event_node(
1363 arbor,
1364 &tree_id,
1365 &tool_use,
1366 &NodeEvent::AssistantComplete { usage: None },
1367 )
1368 .await
1369 .unwrap();
1370
1371 let tool_result = create_event_node(
1372 arbor,
1373 &tree_id,
1374 &complete1,
1375 &NodeEvent::UserToolResult {
1376 tool_use_id: "toolu_123".to_string(),
1377 content: "File written successfully".to_string(),
1378 is_error: false,
1379 },
1380 )
1381 .await
1382 .unwrap();
1383
1384 let assistant_start2 = create_event_node(
1385 arbor,
1386 &tree_id,
1387 &tool_result,
1388 &NodeEvent::AssistantStart,
1389 )
1390 .await
1391 .unwrap();
1392
1393 let content2 = create_event_node(
1394 arbor,
1395 &tree_id,
1396 &assistant_start2,
1397 &NodeEvent::ContentText {
1398 text: "Done!".to_string(),
1399 },
1400 )
1401 .await
1402 .unwrap();
1403
1404 let complete2 = create_event_node(
1405 arbor,
1406 &tree_id,
1407 &content2,
1408 &NodeEvent::AssistantComplete { usage: None },
1409 )
1410 .await
1411 .unwrap();
1412
1413 let messages = storage
1415 .render_messages(&tree_id, &root, &complete2)
1416 .await
1417 .unwrap();
1418
1419 assert_eq!(messages.len(), 4);
1421
1422 assert_eq!(messages[0].role, "user");
1424 assert_eq!(messages[0].content.len(), 1);
1425
1426 assert_eq!(messages[1].role, "assistant");
1428 assert_eq!(messages[1].content.len(), 2);
1429 if let ContentBlock::Text { text } = &messages[1].content[0] {
1430 assert_eq!(text, "I'll write the file");
1431 } else {
1432 panic!("Expected text content block");
1433 }
1434 if let ContentBlock::ToolUse { id, name, .. } = &messages[1].content[1] {
1435 assert_eq!(id, "toolu_123");
1436 assert_eq!(name, "Write");
1437 } else {
1438 panic!("Expected tool_use content block");
1439 }
1440
1441 assert_eq!(messages[2].role, "user");
1443 assert_eq!(messages[2].content.len(), 1);
1444 if let ContentBlock::ToolResult {
1445 tool_use_id,
1446 content,
1447 is_error,
1448 } = &messages[2].content[0]
1449 {
1450 assert_eq!(tool_use_id, "toolu_123");
1451 assert_eq!(content, "File written successfully");
1452 assert_eq!(*is_error, false);
1453 } else {
1454 panic!("Expected tool_result content block");
1455 }
1456
1457 assert_eq!(messages[3].role, "assistant");
1459 assert_eq!(messages[3].content.len(), 1);
1460 if let ContentBlock::Text { text } = &messages[3].content[0] {
1461 assert_eq!(text, "Done!");
1462 } else {
1463 panic!("Expected text content block");
1464 }
1465 }
1466
1467 #[tokio::test]
1468 async fn test_render_with_thinking() {
1469 let (storage, _temp_path) = create_test_storage().await;
1470 let arbor = storage.arbor();
1471
1472 let tree_id = arbor.tree_create(None, "test-thinking").await.unwrap();
1474 let tree = arbor.tree_get(&tree_id).await.unwrap();
1475 let root = tree.root;
1476
1477 let user_node = create_event_node(
1479 arbor,
1480 &tree_id,
1481 &root,
1482 &NodeEvent::UserMessage {
1483 content: "Solve this problem".to_string(),
1484 },
1485 )
1486 .await
1487 .unwrap();
1488
1489 let assistant_start = create_event_node(
1490 arbor,
1491 &tree_id,
1492 &user_node,
1493 &NodeEvent::AssistantStart,
1494 )
1495 .await
1496 .unwrap();
1497
1498 let thinking = create_event_node(
1499 arbor,
1500 &tree_id,
1501 &assistant_start,
1502 &NodeEvent::ContentThinking {
1503 thinking: "Let me think about this...".to_string(),
1504 },
1505 )
1506 .await
1507 .unwrap();
1508
1509 let content = create_event_node(
1510 arbor,
1511 &tree_id,
1512 &thinking,
1513 &NodeEvent::ContentText {
1514 text: "Here's the solution".to_string(),
1515 },
1516 )
1517 .await
1518 .unwrap();
1519
1520 let complete = create_event_node(
1521 arbor,
1522 &tree_id,
1523 &content,
1524 &NodeEvent::AssistantComplete { usage: None },
1525 )
1526 .await
1527 .unwrap();
1528
1529 let messages = storage
1531 .render_messages(&tree_id, &root, &complete)
1532 .await
1533 .unwrap();
1534
1535 assert_eq!(messages.len(), 2);
1537
1538 assert_eq!(messages[1].role, "assistant");
1540 assert_eq!(messages[1].content.len(), 2);
1541 if let ContentBlock::Thinking { thinking } = &messages[1].content[0] {
1542 assert_eq!(thinking, "Let me think about this...");
1543 } else {
1544 panic!("Expected thinking content block");
1545 }
1546 if let ContentBlock::Text { text } = &messages[1].content[1] {
1547 assert_eq!(text, "Here's the solution");
1548 } else {
1549 panic!("Expected text content block");
1550 }
1551 }
1552
1553 #[tokio::test]
1554 async fn test_render_partial_path() {
1555 let (storage, _temp_path) = create_test_storage().await;
1556 let arbor = storage.arbor();
1557
1558 let tree_id = arbor.tree_create(None, "test-partial").await.unwrap();
1560 let tree = arbor.tree_get(&tree_id).await.unwrap();
1561 let root = tree.root;
1562
1563 let user1 = create_event_node(
1565 arbor,
1566 &tree_id,
1567 &root,
1568 &NodeEvent::UserMessage {
1569 content: "First message".to_string(),
1570 },
1571 )
1572 .await
1573 .unwrap();
1574
1575 let assistant_start1 = create_event_node(
1576 arbor,
1577 &tree_id,
1578 &user1,
1579 &NodeEvent::AssistantStart,
1580 )
1581 .await
1582 .unwrap();
1583
1584 let content1 = create_event_node(
1585 arbor,
1586 &tree_id,
1587 &assistant_start1,
1588 &NodeEvent::ContentText {
1589 text: "First response".to_string(),
1590 },
1591 )
1592 .await
1593 .unwrap();
1594
1595 let complete1 = create_event_node(
1596 arbor,
1597 &tree_id,
1598 &content1,
1599 &NodeEvent::AssistantComplete { usage: None },
1600 )
1601 .await
1602 .unwrap();
1603
1604 let user2 = create_event_node(
1605 arbor,
1606 &tree_id,
1607 &complete1,
1608 &NodeEvent::UserMessage {
1609 content: "Second message".to_string(),
1610 },
1611 )
1612 .await
1613 .unwrap();
1614
1615 let assistant_start2 = create_event_node(
1616 arbor,
1617 &tree_id,
1618 &user2,
1619 &NodeEvent::AssistantStart,
1620 )
1621 .await
1622 .unwrap();
1623
1624 let content2 = create_event_node(
1625 arbor,
1626 &tree_id,
1627 &assistant_start2,
1628 &NodeEvent::ContentText {
1629 text: "Second response".to_string(),
1630 },
1631 )
1632 .await
1633 .unwrap();
1634
1635 let complete2 = create_event_node(
1636 arbor,
1637 &tree_id,
1638 &content2,
1639 &NodeEvent::AssistantComplete { usage: None },
1640 )
1641 .await
1642 .unwrap();
1643
1644 let messages = storage
1646 .render_messages(&tree_id, &user2, &complete2)
1647 .await
1648 .unwrap();
1649
1650 assert_eq!(messages.len(), 2);
1652 assert_eq!(messages[0].role, "user");
1653 if let ContentBlock::Text { text } = &messages[0].content[0] {
1654 assert_eq!(text, "Second message");
1655 } else {
1656 panic!("Expected text content block");
1657 }
1658 }
1659}