1use super::types::{
2 BufferedEvent, ChatEvent, ClaudeCodeConfig, ClaudeCodeError, ClaudeCodeHandle, ClaudeCodeId,
3 ClaudeCodeInfo, ClaudeMessage, ContentBlock, Message, MessageId, MessageRole, Model,
4 NodeEvent, Position, StreamId, StreamInfo, StreamStatus,
5};
6use crate::activations::arbor::{ArborStorage, NodeId, NodeType, TreeId};
7use crate::activations::storage::init_sqlite_pool;
8use crate::activation_db_path_from_module;
9use serde_json::Value;
10use sqlx::{sqlite::SqlitePool, Row};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18fn parse_err(context: &'static str, detail: impl std::fmt::Display) -> ClaudeCodeError {
20 ClaudeCodeError::Parse { context, detail: detail.to_string() }
21}
22
23fn db_err(operation: &'static str, source: sqlx::Error) -> ClaudeCodeError {
25 ClaudeCodeError::Database { operation, source }
26}
27
28#[derive(Debug, Clone)]
30pub struct ClaudeCodeStorageConfig {
31 pub db_path: PathBuf,
33}
34
35impl Default for ClaudeCodeStorageConfig {
36 fn default() -> Self {
37 Self {
38 db_path: activation_db_path_from_module!("claudecode.db"),
39 }
40 }
41}
42
43#[derive(Debug)]
45struct ActiveStreamBuffer {
46 info: StreamInfo,
48 events: Vec<BufferedEvent>,
50}
51
52pub struct ClaudeCodeStorage {
54 pool: SqlitePool,
55 arbor: Arc<ArborStorage>,
56 streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>>,
58}
59
60impl ClaudeCodeStorage {
61 pub async fn new(
63 config: ClaudeCodeStorageConfig,
64 arbor: Arc<ArborStorage>,
65 ) -> Result<Self, ClaudeCodeError> {
66 let pool = init_sqlite_pool(config.db_path).await
67 .map_err(|e| db_err("connect", sqlx::Error::Configuration(e.into())))?;
68
69 let storage = Self {
70 pool,
71 arbor,
72 streams: RwLock::new(HashMap::new()),
73 };
74 storage.run_migrations().await?;
75
76 Ok(storage)
77 }
78
79 async fn run_migrations(&self) -> Result<(), ClaudeCodeError> {
81 sqlx::query(
82 r#"
83 CREATE TABLE IF NOT EXISTS claudecode_sessions (
84 id TEXT PRIMARY KEY,
85 name TEXT NOT NULL UNIQUE,
86 claude_session_id TEXT,
87 loopback_session_id TEXT,
88 tree_id TEXT NOT NULL,
89 canonical_head TEXT NOT NULL,
90 working_dir TEXT NOT NULL,
91 model TEXT NOT NULL,
92 system_prompt TEXT,
93 mcp_config TEXT,
94 loopback_enabled INTEGER NOT NULL DEFAULT 0,
95 metadata TEXT,
96 created_at INTEGER NOT NULL,
97 updated_at INTEGER NOT NULL
98 );
99
100 CREATE TABLE IF NOT EXISTS claudecode_messages (
101 id TEXT PRIMARY KEY,
102 session_id TEXT NOT NULL,
103 role TEXT NOT NULL,
104 content TEXT NOT NULL,
105 model_id TEXT,
106 input_tokens INTEGER,
107 output_tokens INTEGER,
108 cost_usd REAL,
109 created_at INTEGER NOT NULL,
110 FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
111 );
112
113 CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_name ON claudecode_sessions(name);
114 CREATE INDEX IF NOT EXISTS idx_claudecode_sessions_tree ON claudecode_sessions(tree_id);
115 CREATE INDEX IF NOT EXISTS idx_claudecode_messages_session ON claudecode_messages(session_id);
116
117 CREATE TABLE IF NOT EXISTS claudecode_unknown_events (
118 id TEXT PRIMARY KEY,
119 session_id TEXT,
120 event_type TEXT NOT NULL,
121 data TEXT NOT NULL,
122 created_at INTEGER NOT NULL,
123 FOREIGN KEY (session_id) REFERENCES claudecode_sessions(id) ON DELETE CASCADE
124 );
125
126 CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_session ON claudecode_unknown_events(session_id);
127 CREATE INDEX IF NOT EXISTS idx_claudecode_unknown_events_type ON claudecode_unknown_events(event_type);
128 "#,
129 )
130 .execute(&self.pool)
131 .await
132 .map_err(|e| db_err("run migrations", e))?;
133
134 let _ = sqlx::query(
136 "ALTER TABLE claudecode_sessions ADD COLUMN loopback_session_id TEXT",
137 )
138 .execute(&self.pool)
139 .await;
140
141 Ok(())
142 }
143
144 pub fn arbor(&self) -> &ArborStorage {
146 &self.arbor
147 }
148
149 pub async fn session_create(
155 &self,
156 name: String,
157 working_dir: String,
158 model: Model,
159 system_prompt: Option<String>,
160 mcp_config: Option<Value>,
161 loopback_enabled: bool,
162 claude_session_id: Option<String>,
163 loopback_session_id: Option<String>,
164 metadata: Option<Value>,
165 ) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
166 let session_id = ClaudeCodeId::new_v4();
167 let now = current_timestamp();
168
169 let tree_id = self
171 .arbor
172 .tree_create(metadata.clone(), &session_id.to_string())
173 .await
174 .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
175
176 let tree = self
178 .arbor
179 .tree_get(&tree_id)
180 .await
181 .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
182 let head = Position::new(tree_id, tree.root);
183
184 let metadata_json = metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
185 let mcp_config_json = mcp_config.as_ref().map(|m| serde_json::to_string(m).unwrap());
186
187 let final_name = match sqlx::query(
189 "INSERT INTO claudecode_sessions (id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
190 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
191 )
192 .bind(session_id.to_string())
193 .bind(&name)
194 .bind(&claude_session_id)
195 .bind(&loopback_session_id)
196 .bind(head.tree_id.to_string())
197 .bind(head.node_id.to_string())
198 .bind(&working_dir)
199 .bind(model.as_str())
200 .bind(&system_prompt)
201 .bind(mcp_config_json.clone())
202 .bind(loopback_enabled)
203 .bind(metadata_json.clone())
204 .bind(now)
205 .bind(now)
206 .execute(&self.pool)
207 .await
208 {
209 Ok(_) => name,
210 Err(e) if e.to_string().contains("UNIQUE constraint failed") => {
211 let unique_name = format!("{}#{}", name, session_id);
213
214 sqlx::query(
215 "INSERT INTO claudecode_sessions (id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at)
216 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
217 )
218 .bind(session_id.to_string())
219 .bind(&unique_name)
220 .bind(&claude_session_id)
221 .bind(&loopback_session_id)
222 .bind(head.tree_id.to_string())
223 .bind(head.node_id.to_string())
224 .bind(&working_dir)
225 .bind(model.as_str())
226 .bind(&system_prompt)
227 .bind(mcp_config_json)
228 .bind(loopback_enabled)
229 .bind(metadata_json)
230 .bind(now)
231 .bind(now)
232 .execute(&self.pool)
233 .await
234 .map_err(|e| db_err("create session (unique name)", e))?;
235
236 unique_name
237 }
238 Err(e) => return Err(db_err("create session", e)),
239 };
240
241 Ok(ClaudeCodeConfig {
242 id: session_id,
243 name: final_name,
244 claude_session_id,
245 loopback_session_id,
246 head,
247 working_dir,
248 model,
249 system_prompt,
250 mcp_config,
251 loopback_enabled,
252 metadata,
253 created_at: now,
254 updated_at: now,
255 })
256 }
257
258 pub async fn session_get(&self, session_id: &ClaudeCodeId) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
260 let row = sqlx::query(
261 "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
262 FROM claudecode_sessions WHERE id = ?",
263 )
264 .bind(session_id.to_string())
265 .fetch_optional(&self.pool)
266 .await
267 .map_err(|e| db_err("fetch session", e))?
268 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() })?;
269
270 self.row_to_config(row)
271 }
272
273 pub async fn session_get_by_name(&self, name: &str) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
275 if let Some(row) = sqlx::query(
277 "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
278 FROM claudecode_sessions WHERE name = ?",
279 )
280 .bind(name)
281 .fetch_optional(&self.pool)
282 .await
283 .map_err(|e| db_err("fetch session by name", e))?
284 {
285 return self.row_to_config(row);
286 }
287
288 let pattern = format!("{}%", name);
290 let rows = sqlx::query(
291 "SELECT id, name, claude_session_id, loopback_session_id, tree_id, canonical_head, working_dir, model, system_prompt, mcp_config, loopback_enabled, metadata, created_at, updated_at
292 FROM claudecode_sessions WHERE name LIKE ?",
293 )
294 .bind(&pattern)
295 .fetch_all(&self.pool)
296 .await
297 .map_err(|e| db_err("fetch session by pattern", e))?;
298
299 match rows.len() {
300 0 => Err(ClaudeCodeError::SessionNotFound { identifier: name.to_string() }),
301 1 => self.row_to_config(rows.into_iter().next().unwrap()),
302 _ => {
303 let matches: Vec<String> = rows.iter().map(|r| r.get("name")).collect();
304 Err(ClaudeCodeError::AmbiguousSession {
305 name: name.to_string(),
306 matches: matches.join(", "),
307 })
308 }
309 }
310 }
311
312 pub async fn session_list(&self) -> Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> {
314 let rows = sqlx::query(
315 "SELECT id, name, claude_session_id, tree_id, canonical_head, working_dir, model, loopback_enabled, created_at
316 FROM claudecode_sessions ORDER BY created_at DESC",
317 )
318 .fetch_all(&self.pool)
319 .await
320 .map_err(|e| db_err("list sessions", e))?;
321
322 let sessions: Result<Vec<ClaudeCodeInfo>, ClaudeCodeError> = rows
323 .iter()
324 .map(|row| {
325 let id_str: String = row.get("id");
326 let tree_id_str: String = row.get("tree_id");
327 let head_str: String = row.get("canonical_head");
328 let model_str: String = row.get("model");
329 let loopback: i32 = row.get("loopback_enabled");
330
331 let tree_id = TreeId::parse_str(&tree_id_str)
332 .map_err(|e| parse_err("tree ID", e))?;
333 let node_id = NodeId::parse_str(&head_str)
334 .map_err(|e| parse_err("node ID", e))?;
335 let model = Model::from_str(&model_str)
336 .ok_or_else(|| parse_err("model", &model_str))?;
337
338 Ok(ClaudeCodeInfo {
339 id: Uuid::parse_str(&id_str).map_err(|e| parse_err("session ID", e))?,
340 name: row.get("name"),
341 model,
342 head: Position::new(tree_id, node_id),
343 claude_session_id: row.get("claude_session_id"),
344 working_dir: row.get("working_dir"),
345 loopback_enabled: loopback != 0,
346 created_at: row.get("created_at"),
347 })
348 })
349 .collect();
350
351 sessions
352 }
353
354 pub async fn session_update_head(
356 &self,
357 session_id: &ClaudeCodeId,
358 new_head: NodeId,
359 claude_session_id: Option<String>,
360 ) -> Result<(), ClaudeCodeError> {
361 let now = current_timestamp();
362
363 let result = if let Some(claude_id) = claude_session_id {
364 sqlx::query(
365 "UPDATE claudecode_sessions SET canonical_head = ?, claude_session_id = ?, updated_at = ? WHERE id = ?",
366 )
367 .bind(new_head.to_string())
368 .bind(claude_id)
369 .bind(now)
370 .bind(session_id.to_string())
371 .execute(&self.pool)
372 .await
373 } else {
374 sqlx::query(
375 "UPDATE claudecode_sessions SET canonical_head = ?, updated_at = ? WHERE id = ?",
376 )
377 .bind(new_head.to_string())
378 .bind(now)
379 .bind(session_id.to_string())
380 .execute(&self.pool)
381 .await
382 }
383 .map_err(|e| db_err("update session head", e))?;
384
385 if result.rows_affected() == 0 {
386 return Err(ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() });
387 }
388
389 Ok(())
390 }
391
392 pub async fn session_update_loopback_id(
394 &self,
395 session_id: &ClaudeCodeId,
396 loopback_session_id: String,
397 ) -> Result<(), ClaudeCodeError> {
398 let now = current_timestamp();
399 sqlx::query(
400 "UPDATE claudecode_sessions SET loopback_session_id = ?, updated_at = ? WHERE id = ?",
401 )
402 .bind(&loopback_session_id)
403 .bind(now)
404 .bind(session_id.to_string())
405 .execute(&self.pool)
406 .await
407 .map_err(|e| db_err("update loopback_session_id", e))?;
408 Ok(())
409 }
410
411 pub async fn session_update_claude_id(
413 &self,
414 session_id: &ClaudeCodeId,
415 claude_session_id: String,
416 ) -> Result<(), ClaudeCodeError> {
417 let now = current_timestamp();
418 sqlx::query(
419 "UPDATE claudecode_sessions SET claude_session_id = ?, updated_at = ? WHERE id = ?",
420 )
421 .bind(&claude_session_id)
422 .bind(now)
423 .bind(session_id.to_string())
424 .execute(&self.pool)
425 .await
426 .map_err(|e| db_err("update claude_session_id", e))?;
427 Ok(())
428 }
429
430 pub async fn session_update(
432 &self,
433 session_id: &ClaudeCodeId,
434 name: Option<String>,
435 model: Option<Model>,
436 system_prompt: Option<Option<String>>,
437 mcp_config: Option<Value>,
438 metadata: Option<Value>,
439 ) -> Result<(), ClaudeCodeError> {
440 let now = current_timestamp();
441 let current = self.session_get(session_id).await?;
442
443 let new_name = name.unwrap_or(current.name);
444 let new_model = model.unwrap_or(current.model);
445 let new_prompt = system_prompt.unwrap_or(current.system_prompt);
446 let new_mcp = mcp_config.or(current.mcp_config);
447 let new_metadata = metadata.or(current.metadata);
448
449 let mcp_json = new_mcp.as_ref().map(|m| serde_json::to_string(m).unwrap());
450 let metadata_json = new_metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
451
452 sqlx::query(
453 "UPDATE claudecode_sessions SET name = ?, model = ?, system_prompt = ?, mcp_config = ?, metadata = ?, updated_at = ? WHERE id = ?",
454 )
455 .bind(&new_name)
456 .bind(new_model.as_str())
457 .bind(&new_prompt)
458 .bind(mcp_json)
459 .bind(metadata_json)
460 .bind(now)
461 .bind(session_id.to_string())
462 .execute(&self.pool)
463 .await
464 .map_err(|e| db_err("update session", e))?;
465
466 Ok(())
467 }
468
469 pub async fn session_delete(&self, session_id: &ClaudeCodeId) -> Result<(), ClaudeCodeError> {
471 let result = sqlx::query("DELETE FROM claudecode_sessions WHERE id = ?")
472 .bind(session_id.to_string())
473 .execute(&self.pool)
474 .await
475 .map_err(|e| db_err("delete session", e))?;
476
477 if result.rows_affected() == 0 {
478 return Err(ClaudeCodeError::SessionNotFound { identifier: session_id.to_string() });
479 }
480
481 Ok(())
482 }
483
484 pub async fn message_create(
490 &self,
491 session_id: &ClaudeCodeId,
492 role: MessageRole,
493 content: String,
494 model_id: Option<String>,
495 input_tokens: Option<i64>,
496 output_tokens: Option<i64>,
497 cost_usd: Option<f64>,
498 ) -> Result<Message, ClaudeCodeError> {
499 let message_id = MessageId::new_v4();
500 let now = current_timestamp();
501
502 sqlx::query(
503 "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
504 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
505 )
506 .bind(message_id.to_string())
507 .bind(session_id.to_string())
508 .bind(role.as_str())
509 .bind(&content)
510 .bind(&model_id)
511 .bind(input_tokens)
512 .bind(output_tokens)
513 .bind(cost_usd)
514 .bind(now)
515 .execute(&self.pool)
516 .await
517 .map_err(|e| db_err("create message", e))?;
518
519 Ok(Message {
520 id: message_id,
521 session_id: *session_id,
522 role,
523 content,
524 created_at: now,
525 model_id,
526 input_tokens,
527 output_tokens,
528 cost_usd,
529 })
530 }
531
532 pub async fn message_create_ephemeral(
534 &self,
535 session_id: &ClaudeCodeId,
536 role: MessageRole,
537 content: String,
538 model_id: Option<String>,
539 input_tokens: Option<i64>,
540 output_tokens: Option<i64>,
541 cost_usd: Option<f64>,
542 ) -> Result<Message, ClaudeCodeError> {
543 let message_id = MessageId::new_v4();
544 let now = current_timestamp();
545
546 let ephemeral_marker = -now;
550
551 sqlx::query(
552 "INSERT INTO claudecode_messages (id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at)
553 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
554 )
555 .bind(message_id.to_string())
556 .bind(session_id.to_string())
557 .bind(role.as_str())
558 .bind(&content)
559 .bind(&model_id)
560 .bind(input_tokens)
561 .bind(output_tokens)
562 .bind(cost_usd)
563 .bind(ephemeral_marker)
564 .execute(&self.pool)
565 .await
566 .map_err(|e| db_err("create ephemeral message", e))?;
567
568 Ok(Message {
569 id: message_id,
570 session_id: *session_id,
571 role,
572 content,
573 created_at: ephemeral_marker,
574 model_id,
575 input_tokens,
576 output_tokens,
577 cost_usd,
578 })
579 }
580
581 pub async fn message_get(&self, message_id: &MessageId) -> Result<Message, ClaudeCodeError> {
583 let row = sqlx::query(
584 "SELECT id, session_id, role, content, model_id, input_tokens, output_tokens, cost_usd, created_at
585 FROM claudecode_messages WHERE id = ?",
586 )
587 .bind(message_id.to_string())
588 .fetch_optional(&self.pool)
589 .await
590 .map_err(|e| db_err("fetch message", e))?
591 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("message:{}", message_id) })?;
592
593 self.row_to_message(row)
594 }
595
596 pub async fn resolve_message_handle(&self, identifier: &str) -> Result<Message, ClaudeCodeError> {
599 let parts: Vec<&str> = identifier.splitn(3, ':').collect();
600 if parts.len() < 2 {
601 return Err(parse_err("message handle", format!("invalid format: {}", identifier)));
602 }
603
604 let msg_part = parts[0];
605 if !msg_part.starts_with("msg-") {
606 return Err(parse_err("message handle", format!("invalid prefix: {}", identifier)));
607 }
608
609 let message_id_str = &msg_part[4..];
610 let message_id = Uuid::parse_str(message_id_str)
611 .map_err(|e| parse_err("message ID in handle", e))?;
612
613 self.message_get(&message_id).await
614 }
615
616 pub fn message_to_handle(message: &Message, name: &str) -> crate::types::Handle {
621 ClaudeCodeHandle::Message {
622 message_id: format!("msg-{}", message.id),
623 role: message.role.as_str().to_string(),
624 name: name.to_string(),
625 }.to_handle()
626 }
627
628 pub async fn unknown_event_store(
634 &self,
635 session_id: Option<&ClaudeCodeId>,
636 event_type: &str,
637 data: &Value,
638 ) -> Result<String, ClaudeCodeError> {
639 let id = Uuid::new_v4().to_string();
640 let now = current_timestamp();
641 let data_json = serde_json::to_string(data)?;
642
643 sqlx::query(
644 "INSERT INTO claudecode_unknown_events (id, session_id, event_type, data, created_at)
645 VALUES (?, ?, ?, ?, ?)",
646 )
647 .bind(&id)
648 .bind(session_id.map(|s| s.to_string()))
649 .bind(event_type)
650 .bind(&data_json)
651 .bind(now)
652 .execute(&self.pool)
653 .await
654 .map_err(|e| db_err("store unknown event", e))?;
655
656 Ok(id)
657 }
658
659 pub async fn unknown_event_get(&self, id: &str) -> Result<(String, Value), ClaudeCodeError> {
661 let row = sqlx::query(
662 "SELECT event_type, data FROM claudecode_unknown_events WHERE id = ?",
663 )
664 .bind(id)
665 .fetch_optional(&self.pool)
666 .await
667 .map_err(|e| db_err("fetch unknown event", e))?
668 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("unknown_event:{}", id) })?;
669
670 let event_type: String = row.get("event_type");
671 let data_json: String = row.get("data");
672 let data: Value = serde_json::from_str(&data_json)?;
673
674 Ok((event_type, data))
675 }
676
677 pub async fn unknown_events_by_type(&self, event_type: &str) -> Result<Vec<(String, Value)>, ClaudeCodeError> {
679 let rows = sqlx::query(
680 "SELECT id, data FROM claudecode_unknown_events WHERE event_type = ? ORDER BY created_at DESC",
681 )
682 .bind(event_type)
683 .fetch_all(&self.pool)
684 .await
685 .map_err(|e| db_err("list unknown events", e))?;
686
687 rows.iter()
688 .map(|row| {
689 let id: String = row.get("id");
690 let data_json: String = row.get("data");
691 let data: Value = serde_json::from_str(&data_json)?;
692 Ok((id, data))
693 })
694 .collect()
695 }
696
697 pub async fn stream_create(
703 &self,
704 session_id: ClaudeCodeId,
705 ) -> Result<StreamId, ClaudeCodeError> {
706 let stream_id = StreamId::new_v4();
707 let now = current_timestamp();
708
709 let info = StreamInfo {
710 stream_id,
711 session_id,
712 status: StreamStatus::Running,
713 user_position: None,
714 event_count: 0,
715 read_position: 0,
716 started_at: now,
717 ended_at: None,
718 error: None,
719 };
720
721 let buffer = ActiveStreamBuffer {
722 info,
723 events: Vec::new(),
724 };
725
726 let mut streams = self.streams.write().await;
727 streams.insert(stream_id, buffer);
728
729 Ok(stream_id)
730 }
731
732 pub async fn stream_set_user_position(
734 &self,
735 stream_id: &StreamId,
736 position: Position,
737 ) -> Result<(), ClaudeCodeError> {
738 let mut streams = self.streams.write().await;
739 let buffer = streams.get_mut(stream_id)
740 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
741 buffer.info.user_position = Some(position);
742 Ok(())
743 }
744
745 pub async fn stream_push_event(
747 &self,
748 stream_id: &StreamId,
749 event: ChatEvent,
750 ) -> Result<u64, ClaudeCodeError> {
751 let now = current_timestamp();
752 let mut streams = self.streams.write().await;
753 let buffer = streams.get_mut(stream_id)
754 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
755
756 let seq = buffer.info.event_count;
757 buffer.events.push(BufferedEvent {
758 seq,
759 event,
760 timestamp: now,
761 });
762 buffer.info.event_count += 1;
763
764 Ok(seq)
765 }
766
767 pub async fn stream_set_status(
769 &self,
770 stream_id: &StreamId,
771 status: StreamStatus,
772 error: Option<String>,
773 ) -> Result<(), ClaudeCodeError> {
774 let now = current_timestamp();
775 let mut streams = self.streams.write().await;
776 let buffer = streams.get_mut(stream_id)
777 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
778
779 buffer.info.status = status;
780 if status == StreamStatus::Complete || status == StreamStatus::Failed {
781 buffer.info.ended_at = Some(now);
782 }
783 if let Some(e) = error {
784 buffer.info.error = Some(e);
785 }
786
787 Ok(())
788 }
789
790 pub async fn stream_get_info(&self, stream_id: &StreamId) -> Result<StreamInfo, ClaudeCodeError> {
792 let streams = self.streams.read().await;
793 let buffer = streams.get(stream_id)
794 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
795 Ok(buffer.info.clone())
796 }
797
798 pub async fn stream_poll(
801 &self,
802 stream_id: &StreamId,
803 from_seq: Option<u64>,
804 limit: Option<usize>,
805 ) -> Result<(StreamInfo, Vec<BufferedEvent>), ClaudeCodeError> {
806 let mut streams = self.streams.write().await;
807 let buffer = streams.get_mut(stream_id)
808 .ok_or_else(|| ClaudeCodeError::SessionNotFound { identifier: format!("stream:{}", stream_id) })?;
809
810 let start = from_seq.unwrap_or(buffer.info.read_position) as usize;
811 let max_events = limit.unwrap_or(100);
812
813 let events: Vec<BufferedEvent> = buffer.events
814 .iter()
815 .skip(start)
816 .take(max_events)
817 .cloned()
818 .collect();
819
820 if !events.is_empty() {
822 let last_seq = events.last().unwrap().seq;
823 buffer.info.read_position = last_seq + 1;
824 }
825
826 Ok((buffer.info.clone(), events))
827 }
828
829 pub async fn stream_list(&self) -> Vec<StreamInfo> {
831 let streams = self.streams.read().await;
832 streams.values().map(|b| b.info.clone()).collect()
833 }
834
835 pub async fn stream_list_for_session(&self, session_id: &ClaudeCodeId) -> Vec<StreamInfo> {
837 let streams = self.streams.read().await;
838 streams
839 .values()
840 .filter(|b| &b.info.session_id == session_id)
841 .map(|b| b.info.clone())
842 .collect()
843 }
844
845 pub async fn stream_cleanup(&self, stream_id: &StreamId) -> Option<StreamInfo> {
848 let mut streams = self.streams.write().await;
849 streams.remove(stream_id).map(|b| b.info)
850 }
851
852 pub async fn stream_exists(&self, stream_id: &StreamId) -> bool {
854 let streams = self.streams.read().await;
855 streams.contains_key(stream_id)
856 }
857
858 pub async fn render_messages(
873 &self,
874 tree_id: &TreeId,
875 start: &NodeId,
876 end: &NodeId,
877 ) -> Result<Vec<ClaudeMessage>, ClaudeCodeError> {
878 let node_ids = self
880 .arbor
881 .node_get_path(tree_id, end)
882 .await
883 .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
884
885 let start_idx = node_ids
887 .iter()
888 .position(|id| id == start)
889 .ok_or_else(|| ClaudeCodeError::Arbor("start node not found in path from root to end".to_string()))?;
890
891 let node_ids = &node_ids[start_idx..];
892
893 let mut messages: Vec<ClaudeMessage> = Vec::new();
895 let mut current_role: Option<String> = None;
896 let mut current_content: Vec<ContentBlock> = Vec::new();
897
898 for node_id in node_ids {
899 let node = self
901 .arbor
902 .node_get(tree_id, node_id)
903 .await
904 .map_err(|e| ClaudeCodeError::Arbor(e.to_string()))?;
905
906 let content = match &node.data {
908 NodeType::Text { content } => content,
909 NodeType::External { .. } => {
910 continue;
912 }
913 };
914
915 if content.trim().is_empty() {
917 continue;
918 }
919
920 let event: NodeEvent = serde_json::from_str(content)?;
922
923 match event {
924 NodeEvent::UserMessage { content } => {
925 if let Some(role) = current_role.take() {
927 if !current_content.is_empty() {
928 messages.push(ClaudeMessage {
929 role,
930 content: current_content.clone(),
931 });
932 current_content.clear();
933 }
934 }
935
936 current_role = Some("user".to_string());
938 current_content.push(ContentBlock::Text { text: content });
939 }
940
941 NodeEvent::AssistantStart => {
942 if let Some(role) = current_role.take() {
944 if !current_content.is_empty() {
945 messages.push(ClaudeMessage {
946 role,
947 content: current_content.clone(),
948 });
949 current_content.clear();
950 }
951 }
952
953 current_role = Some("assistant".to_string());
955 }
956
957 NodeEvent::ContentText { text } => {
958 current_content.push(ContentBlock::Text { text });
960 }
961
962 NodeEvent::ContentToolUse { id, name, input } => {
963 current_content.push(ContentBlock::ToolUse { id, name, input });
965 }
966
967 NodeEvent::ContentThinking { thinking } => {
968 current_content.push(ContentBlock::Thinking { thinking });
970 }
971
972 NodeEvent::UserToolResult {
973 tool_use_id,
974 content,
975 is_error,
976 } => {
977 if let Some(role) = current_role.take() {
979 if !current_content.is_empty() {
980 messages.push(ClaudeMessage {
981 role,
982 content: current_content.clone(),
983 });
984 current_content.clear();
985 }
986 }
987
988 messages.push(ClaudeMessage {
990 role: "user".to_string(),
991 content: vec![ContentBlock::ToolResult {
992 tool_use_id,
993 content,
994 is_error,
995 }],
996 });
997 }
998
999 NodeEvent::AssistantComplete { usage: _ } => {
1000 if let Some(role) = current_role.take() {
1002 if !current_content.is_empty() {
1003 messages.push(ClaudeMessage {
1004 role,
1005 content: current_content.clone(),
1006 });
1007 current_content.clear();
1008 }
1009 }
1010 }
1011
1012 NodeEvent::LaunchCommand { .. } | NodeEvent::ClaudeStderr { .. } => {}
1014 }
1015 }
1016
1017 if let Some(role) = current_role {
1019 if !current_content.is_empty() {
1020 messages.push(ClaudeMessage {
1021 role,
1022 content: current_content,
1023 });
1024 }
1025 }
1026
1027 Ok(messages)
1028 }
1029
1030 fn row_to_message(&self, row: sqlx::sqlite::SqliteRow) -> Result<Message, ClaudeCodeError> {
1035 let id_str: String = row.get("id");
1036 let session_id_str: String = row.get("session_id");
1037 let role_str: String = row.get("role");
1038
1039 Ok(Message {
1040 id: Uuid::parse_str(&id_str).map_err(|e| parse_err("message ID", e))?,
1041 session_id: Uuid::parse_str(&session_id_str)
1042 .map_err(|e| parse_err("session ID", e))?,
1043 role: MessageRole::from_str(&role_str)
1044 .ok_or_else(|| parse_err("role", &role_str))?,
1045 content: row.get("content"),
1046 created_at: row.get("created_at"),
1047 model_id: row.get("model_id"),
1048 input_tokens: row.get("input_tokens"),
1049 output_tokens: row.get("output_tokens"),
1050 cost_usd: row.get("cost_usd"),
1051 })
1052 }
1053
1054 fn row_to_config(&self, row: sqlx::sqlite::SqliteRow) -> Result<ClaudeCodeConfig, ClaudeCodeError> {
1055 let id_str: String = row.get("id");
1056 let tree_id_str: String = row.get("tree_id");
1057 let head_str: String = row.get("canonical_head");
1058 let model_str: String = row.get("model");
1059 let metadata_json: Option<String> = row.get("metadata");
1060 let mcp_config_json: Option<String> = row.get("mcp_config");
1061 let loopback: i32 = row.get("loopback_enabled");
1062
1063 let tree_id = TreeId::parse_str(&tree_id_str)
1064 .map_err(|e| parse_err("tree ID", e))?;
1065 let node_id = NodeId::parse_str(&head_str)
1066 .map_err(|e| parse_err("node ID", e))?;
1067 let model = Model::from_str(&model_str)
1068 .ok_or_else(|| parse_err("model", &model_str))?;
1069
1070 Ok(ClaudeCodeConfig {
1071 id: Uuid::parse_str(&id_str).map_err(|e| parse_err("session ID", e))?,
1072 name: row.get("name"),
1073 claude_session_id: row.get("claude_session_id"),
1074 loopback_session_id: row.try_get("loopback_session_id").ok().flatten(),
1075 head: Position::new(tree_id, node_id),
1076 working_dir: row.get("working_dir"),
1077 model,
1078 system_prompt: row.get("system_prompt"),
1079 mcp_config: mcp_config_json.and_then(|s| serde_json::from_str(&s).ok()),
1080 loopback_enabled: loopback != 0,
1081 metadata: metadata_json.and_then(|s| serde_json::from_str(&s).ok()),
1082 created_at: row.get("created_at"),
1083 updated_at: row.get("updated_at"),
1084 })
1085 }
1086}
1087
1088fn current_timestamp() -> i64 {
1090 SystemTime::now()
1091 .duration_since(UNIX_EPOCH)
1092 .unwrap()
1093 .as_secs() as i64
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::*;
1099
1100 #[tokio::test]
1102 async fn test_stream_buffer_operations() {
1103 let streams: RwLock<HashMap<StreamId, ActiveStreamBuffer>> = RwLock::new(HashMap::new());
1105
1106 let stream_id = StreamId::new_v4();
1108 let session_id = ClaudeCodeId::new_v4();
1109 let now = current_timestamp();
1110
1111 let info = StreamInfo {
1112 stream_id,
1113 session_id,
1114 status: StreamStatus::Running,
1115 user_position: None,
1116 event_count: 0,
1117 read_position: 0,
1118 started_at: now,
1119 ended_at: None,
1120 error: None,
1121 };
1122
1123 let buffer = ActiveStreamBuffer {
1124 info,
1125 events: Vec::new(),
1126 };
1127
1128 streams.write().await.insert(stream_id, buffer);
1129
1130 {
1132 let mut streams = streams.write().await;
1133 let buffer = streams.get_mut(&stream_id).unwrap();
1134
1135 buffer.events.push(BufferedEvent {
1136 seq: 0,
1137 event: ChatEvent::Start {
1138 id: session_id,
1139 user_position: Position::new(TreeId::new(), NodeId::new()),
1140 },
1141 timestamp: now,
1142 });
1143 buffer.info.event_count = 1;
1144
1145 buffer.events.push(BufferedEvent {
1146 seq: 1,
1147 event: ChatEvent::Content { text: "Hello".to_string() },
1148 timestamp: now,
1149 });
1150 buffer.info.event_count = 2;
1151 }
1152
1153 {
1155 let mut streams = streams.write().await;
1156 let buffer = streams.get_mut(&stream_id).unwrap();
1157
1158 let events: Vec<_> = buffer.events.iter().skip(0).take(10).cloned().collect();
1159 assert_eq!(events.len(), 2);
1160 assert_eq!(events[0].seq, 0);
1161 assert_eq!(events[1].seq, 1);
1162
1163 buffer.info.read_position = 2;
1165 }
1166
1167 {
1169 let streams = streams.read().await;
1170 let buffer = streams.get(&stream_id).unwrap();
1171
1172 let events: Vec<_> = buffer.events.iter()
1173 .skip(buffer.info.read_position as usize)
1174 .take(10)
1175 .collect();
1176 assert_eq!(events.len(), 0);
1177 }
1178
1179 {
1181 let mut streams = streams.write().await;
1182 let buffer = streams.get_mut(&stream_id).unwrap();
1183
1184 buffer.events.push(BufferedEvent {
1185 seq: 2,
1186 event: ChatEvent::Content { text: " World".to_string() },
1187 timestamp: now,
1188 });
1189 buffer.info.event_count = 3;
1190 }
1191
1192 {
1194 let mut streams = streams.write().await;
1195 let buffer = streams.get_mut(&stream_id).unwrap();
1196
1197 let events: Vec<_> = buffer.events.iter()
1198 .skip(buffer.info.read_position as usize)
1199 .take(10)
1200 .cloned()
1201 .collect();
1202 assert_eq!(events.len(), 1);
1203 assert_eq!(events[0].seq, 2);
1204
1205 buffer.info.read_position = 3;
1207 }
1208
1209 {
1211 let mut streams = streams.write().await;
1212 let buffer = streams.get_mut(&stream_id).unwrap();
1213
1214 assert_eq!(buffer.info.status, StreamStatus::Running);
1215
1216 buffer.info.status = StreamStatus::AwaitingPermission;
1217 assert_eq!(buffer.info.status, StreamStatus::AwaitingPermission);
1218
1219 buffer.info.status = StreamStatus::Complete;
1220 buffer.info.ended_at = Some(current_timestamp());
1221 assert_eq!(buffer.info.status, StreamStatus::Complete);
1222 assert!(buffer.info.ended_at.is_some());
1223 }
1224 }
1225
1226 #[test]
1227 fn test_stream_status_serialization() {
1228 let status = StreamStatus::AwaitingPermission;
1230 let json = serde_json::to_string(&status).unwrap();
1231 assert_eq!(json, "\"awaiting_permission\"");
1232
1233 let status = StreamStatus::Running;
1234 let json = serde_json::to_string(&status).unwrap();
1235 assert_eq!(json, "\"running\"");
1236 }
1237}
1238
1239#[cfg(test)]
1244mod render_tests {
1245 use super::*;
1246 use crate::activations::arbor::{ArborConfig, ArborError};
1247
1248 async fn create_test_storage() -> (ClaudeCodeStorage, PathBuf) {
1250 let temp_dir = std::env::temp_dir();
1251 let test_id = Uuid::new_v4();
1252 let arbor_path = temp_dir.join(format!("test_arbor_{}.db", test_id));
1253 let claudecode_path = temp_dir.join(format!("test_claudecode_{}.db", test_id));
1254
1255 let arbor_config = ArborConfig {
1256 db_path: arbor_path.clone(),
1257 scheduled_deletion_window: 604800,
1258 archive_window: 2592000,
1259 auto_cleanup: false, cleanup_interval: 3600,
1261 };
1262 let arbor = Arc::new(ArborStorage::new(arbor_config).await.unwrap());
1263
1264 let claudecode_config = ClaudeCodeStorageConfig {
1265 db_path: claudecode_path.clone(),
1266 };
1267 let storage = ClaudeCodeStorage::new(claudecode_config, arbor)
1268 .await
1269 .unwrap();
1270
1271 (storage, arbor_path)
1272 }
1273
1274 async fn create_event_node(
1276 arbor: &ArborStorage,
1277 tree_id: &TreeId,
1278 parent_id: &NodeId,
1279 event: &NodeEvent,
1280 ) -> Result<NodeId, ArborError> {
1281 let content = serde_json::to_string(event).map_err(|e| e.to_string())?;
1282 arbor.node_create_text(tree_id, Some(*parent_id), content, None).await
1283 }
1284
1285 #[tokio::test]
1286 async fn test_render_simple_exchange() {
1287 let (storage, _temp_path) = create_test_storage().await;
1288 let arbor = storage.arbor();
1289
1290 let tree_id = arbor.tree_create(None, "test-session").await.unwrap();
1292 let tree = arbor.tree_get(&tree_id).await.unwrap();
1293 let root = tree.root;
1294
1295 let user_node = create_event_node(
1297 arbor,
1298 &tree_id,
1299 &root,
1300 &NodeEvent::UserMessage {
1301 content: "Hello".to_string(),
1302 },
1303 )
1304 .await
1305 .unwrap();
1306
1307 let assistant_start = create_event_node(
1308 arbor,
1309 &tree_id,
1310 &user_node,
1311 &NodeEvent::AssistantStart,
1312 )
1313 .await
1314 .unwrap();
1315
1316 let content_node = create_event_node(
1317 arbor,
1318 &tree_id,
1319 &assistant_start,
1320 &NodeEvent::ContentText {
1321 text: "Hi there!".to_string(),
1322 },
1323 )
1324 .await
1325 .unwrap();
1326
1327 let complete_node = create_event_node(
1328 arbor,
1329 &tree_id,
1330 &content_node,
1331 &NodeEvent::AssistantComplete { usage: None },
1332 )
1333 .await
1334 .unwrap();
1335
1336 let messages = storage
1338 .render_messages(&tree_id, &root, &complete_node)
1339 .await
1340 .unwrap();
1341
1342 assert_eq!(messages.len(), 2);
1344
1345 assert_eq!(messages[0].role, "user");
1347 assert_eq!(messages[0].content.len(), 1);
1348 if let ContentBlock::Text { text } = &messages[0].content[0] {
1349 assert_eq!(text, "Hello");
1350 } else {
1351 panic!("Expected text content block");
1352 }
1353
1354 assert_eq!(messages[1].role, "assistant");
1356 assert_eq!(messages[1].content.len(), 1);
1357 if let ContentBlock::Text { text } = &messages[1].content[0] {
1358 assert_eq!(text, "Hi there!");
1359 } else {
1360 panic!("Expected text content block");
1361 }
1362 }
1363
1364 #[tokio::test]
1365 async fn test_render_with_tool_use() {
1366 let (storage, _temp_path) = create_test_storage().await;
1367 let arbor = storage.arbor();
1368
1369 let tree_id = arbor.tree_create(None, "test-tool-session").await.unwrap();
1371 let tree = arbor.tree_get(&tree_id).await.unwrap();
1372 let root = tree.root;
1373
1374 let user_node = create_event_node(
1378 arbor,
1379 &tree_id,
1380 &root,
1381 &NodeEvent::UserMessage {
1382 content: "Write a file".to_string(),
1383 },
1384 )
1385 .await
1386 .unwrap();
1387
1388 let assistant_start = create_event_node(
1389 arbor,
1390 &tree_id,
1391 &user_node,
1392 &NodeEvent::AssistantStart,
1393 )
1394 .await
1395 .unwrap();
1396
1397 let text_node = create_event_node(
1398 arbor,
1399 &tree_id,
1400 &assistant_start,
1401 &NodeEvent::ContentText {
1402 text: "I'll write that file.".to_string(),
1403 },
1404 )
1405 .await
1406 .unwrap();
1407
1408 let tool_use_node = create_event_node(
1409 arbor,
1410 &tree_id,
1411 &text_node,
1412 &NodeEvent::ContentToolUse {
1413 id: "tool_123".to_string(),
1414 name: "write_file".to_string(),
1415 input: serde_json::json!({"path": "test.txt", "content": "hello"}),
1416 },
1417 )
1418 .await
1419 .unwrap();
1420
1421 let assistant_complete = create_event_node(
1422 arbor,
1423 &tree_id,
1424 &tool_use_node,
1425 &NodeEvent::AssistantComplete { usage: None },
1426 )
1427 .await
1428 .unwrap();
1429
1430 let tool_result = create_event_node(
1431 arbor,
1432 &tree_id,
1433 &assistant_complete,
1434 &NodeEvent::UserToolResult {
1435 tool_use_id: "tool_123".to_string(),
1436 content: "File written successfully".to_string(),
1437 is_error: false,
1438 },
1439 )
1440 .await
1441 .unwrap();
1442
1443 let assistant_start2 = create_event_node(
1444 arbor,
1445 &tree_id,
1446 &tool_result,
1447 &NodeEvent::AssistantStart,
1448 )
1449 .await
1450 .unwrap();
1451
1452 let content_node2 = create_event_node(
1453 arbor,
1454 &tree_id,
1455 &assistant_start2,
1456 &NodeEvent::ContentText {
1457 text: "Done!".to_string(),
1458 },
1459 )
1460 .await
1461 .unwrap();
1462
1463 let complete_node2 = create_event_node(
1464 arbor,
1465 &tree_id,
1466 &content_node2,
1467 &NodeEvent::AssistantComplete { usage: None },
1468 )
1469 .await
1470 .unwrap();
1471
1472 let messages = storage
1474 .render_messages(&tree_id, &root, &complete_node2)
1475 .await
1476 .unwrap();
1477
1478 assert_eq!(messages.len(), 4, "Expected 4 messages, got {}", messages.len());
1480
1481 assert_eq!(messages[0].role, "user");
1482 assert_eq!(messages[1].role, "assistant");
1483 assert_eq!(messages[1].content.len(), 2); assert_eq!(messages[2].role, "user"); assert_eq!(messages[3].role, "assistant");
1486 }
1487}