1use super::{
2 executor::{ClaudeCodeExecutor, LaunchConfig},
3 sessions,
4 storage::ClaudeCodeStorage,
5 types::*,
6};
7use crate::activations::arbor::{NodeId, TreeId};
8use crate::plexus::{HubContext, NoParent};
9use async_stream::stream;
10use futures::{Stream, StreamExt};
11use plexus_macros::activation;
12use serde_json::Value;
13use std::marker::PhantomData;
14use std::sync::{Arc, OnceLock};
15use tracing::Instrument;
16
17#[derive(Clone)]
24pub struct ClaudeCode<P: HubContext = NoParent> {
25 pub storage: Arc<ClaudeCodeStorage>,
26 executor: ClaudeCodeExecutor,
27 hub: Arc<OnceLock<P>>,
29 _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> ClaudeCode<P> {
33 pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
35 Self {
36 storage,
37 executor: ClaudeCodeExecutor::new(),
38 hub: Arc::new(OnceLock::new()),
39 _phantom: PhantomData,
40 }
41 }
42
43 pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
45 Self {
46 storage,
47 executor,
48 hub: Arc::new(OnceLock::new()),
49 _phantom: PhantomData,
50 }
51 }
52
53 pub fn inject_parent(&self, parent: P) {
58 let _ = self.hub.set(parent);
59 }
60
61 pub fn has_parent(&self) -> bool {
63 self.hub.get().is_some()
64 }
65
66 pub fn parent(&self) -> Option<&P> {
70 self.hub.get()
71 }
72
73 pub async fn resolve_handle_impl(
78 &self,
79 handle: &crate::types::Handle,
80 ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
81 use crate::plexus::{PlexusError, wrap_stream};
82 use async_stream::stream;
83
84 let storage = self.storage.clone();
85
86 if handle.meta.is_empty() {
89 return Err(PlexusError::ExecutionError(
90 "ClaudeCode handle missing message ID in meta".to_string()
91 ));
92 }
93 let identifier = handle.meta.join(":");
94
95 let name = handle.meta.get(2).cloned();
97
98 let result_stream = stream! {
99 match storage.resolve_message_handle(&identifier).await {
100 Ok(message) => {
101 yield ResolveResult::Message {
102 id: message.id.to_string(),
103 role: message.role.as_str().to_string(),
104 content: message.content,
105 model: message.model_id,
106 name: name.unwrap_or_else(|| message.role.as_str().to_string()),
107 };
108 }
109 Err(e) => {
110 yield ResolveResult::Error {
111 message: format!("Failed to resolve handle: {}", e),
112 };
113 }
114 }
115 };
116
117 Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
118 }
119}
120
121impl ClaudeCode<NoParent> {
123 pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
124 Self::with_context_type(storage)
125 }
126
127 pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
128 Self::with_executor_and_context(storage, executor)
129 }
130}
131
132async fn create_event_node(
138 arbor: &crate::activations::arbor::ArborStorage,
139 tree_id: &crate::activations::arbor::TreeId,
140 parent_id: &crate::activations::arbor::NodeId,
141 event: &NodeEvent,
142) -> Result<crate::activations::arbor::NodeId, String> {
143 let json = serde_json::to_string(event)
144 .map_err(|e| format!("Failed to serialize event: {}", e))?;
145
146 arbor.node_create_text(tree_id, Some(*parent_id), json, None)
147 .await
148 .map_err(|e| e.to_string())
149}
150
151#[plexus_macros::activation(namespace = "claudecode",
152version = "1.0.0",
153description = "Manage Claude Code sessions with Arbor-backed conversation history",
154resolve_handle, crate_path = "plexus_core")]
155impl<P: HubContext> ClaudeCode<P> {
156 #[plexus_macros::method(params(
158 name = "Human-readable name for the session",
159 working_dir = "Working directory for Claude Code",
160 model = "Model to use (opus, sonnet, haiku)",
161 system_prompt = "Optional system prompt / instructions",
162 loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval",
163 loopback_session_id = "Session ID for loopback MCP URL correlation (e.g., orcha-xxx-claude-yyy)"
164 ))]
165 pub async fn create(
166 &self,
167 name: String,
168 working_dir: String,
169 model: Model,
170 system_prompt: Option<String>,
171 loopback_enabled: Option<bool>,
172 loopback_session_id: Option<String>,
173 ) -> impl Stream<Item = CreateResult> + Send + 'static {
174 let storage = self.storage.clone();
175 let loopback = loopback_enabled.unwrap_or(false);
176
177 stream! {
178 let working_dir = match std::fs::canonicalize(&working_dir) {
180 Ok(p) => p.to_string_lossy().into_owned(),
181 Err(e) => {
182 yield CreateResult::Err {
183 message: ClaudeCodeError::PathResolution {
184 path: working_dir,
185 source: e,
186 }.to_string(),
187 };
188 return;
189 }
190 };
191
192 if loopback {
196 if let Err(e) = super::executor::check_mcp_reachable().await {
197 yield CreateResult::Err { message: e };
198 return;
199 }
200 }
201
202 match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None, loopback_session_id, None).await {
204 Ok(config) => {
205 yield CreateResult::Ok {
206 id: config.id,
207 head: config.head,
208 };
209 }
210 Err(e) => {
211 yield CreateResult::Err { message: e.to_string() };
212 }
213 }
214 }
215 }
216
217 #[plexus_macros::method(streaming,
219 params(
220 name = "Session name to chat with",
221 prompt = "User message / prompt to send",
222 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion",
223 allowed_tools = "Optional list of tools to allow (e.g. [\"WebSearch\", \"Read\"])"
224 ))]
225 pub async fn chat(
226 &self,
227 name: String,
228 prompt: String,
229 ephemeral: Option<bool>,
230 allowed_tools: Option<Vec<String>>,
231 ) -> impl Stream<Item = ChatEvent> + Send + 'static {
232 let storage = self.storage.clone();
233 let executor = self.executor.clone();
234
235 let resolve_result = storage.session_get_by_name(&name).await;
237
238 stream! {
239 let is_ephemeral = ephemeral.unwrap_or(false);
240
241 let config = match resolve_result {
243 Ok(c) => c,
244 Err(e) => {
245 yield ChatEvent::Err { message: e.to_string() };
246 return;
247 }
248 };
249
250 let session_id = config.id;
251
252 let user_msg = if is_ephemeral {
254 match storage.message_create_ephemeral(
255 &session_id,
256 MessageRole::User,
257 prompt.clone(),
258 None, None, None, None,
259 ).await {
260 Ok(m) => m,
261 Err(e) => {
262 yield ChatEvent::Err { message: e.to_string() };
263 return;
264 }
265 }
266 } else {
267 match storage.message_create(
268 &session_id,
269 MessageRole::User,
270 prompt.clone(),
271 None, None, None, None,
272 ).await {
273 Ok(m) => m,
274 Err(e) => {
275 yield ChatEvent::Err { message: e.to_string() };
276 return;
277 }
278 }
279 };
280
281 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
283 let user_node_id = if is_ephemeral {
284 match storage.arbor().node_create_external_ephemeral(
285 &config.head.tree_id,
286 Some(config.head.node_id),
287 user_handle,
288 None,
289 ).await {
290 Ok(id) => id,
291 Err(e) => {
292 yield ChatEvent::Err { message: e.to_string() };
293 return;
294 }
295 }
296 } else {
297 match storage.arbor().node_create_external(
298 &config.head.tree_id,
299 Some(config.head.node_id),
300 user_handle,
301 None,
302 ).await {
303 Ok(id) => id,
304 Err(e) => {
305 yield ChatEvent::Err { message: e.to_string() };
306 return;
307 }
308 }
309 };
310
311 let user_position = Position::new(config.head.tree_id, user_node_id);
312
313 let mut current_parent = user_node_id;
315
316 let user_event = NodeEvent::UserMessage { content: prompt.clone() };
318 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &user_event).await {
319 current_parent = node_id;
320 }
321
322 let start_event = NodeEvent::AssistantStart;
324 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &start_event).await {
325 current_parent = node_id;
326 }
327
328 yield ChatEvent::Start {
330 id: session_id,
331 user_position,
332 };
333
334 let launch_config = LaunchConfig {
336 query: prompt,
337 session_id: config.claude_session_id.clone(),
339 fork_session: false,
340 model: config.model,
341 working_dir: config.working_dir.clone(),
342 system_prompt: config.system_prompt.clone(),
343 mcp_config: config.mcp_config.clone(),
344 loopback_enabled: config.loopback_enabled,
345 loopback_session_id: if config.loopback_enabled {
347 config.loopback_session_id.clone()
348 } else {
349 None
350 },
351 allowed_tools: allowed_tools.unwrap_or_default(),
352 ..Default::default()
353 };
354
355 let prev_claude_session_id = config.claude_session_id.clone();
357 let mut response_content = String::new();
358 let mut claude_session_id = config.claude_session_id.clone();
359 let mut cost_usd = None;
360 let mut num_turns = None;
361
362 let mut raw_stream = executor.launch(launch_config).await;
363
364 let mut current_tool_id: Option<String> = None;
366 let mut current_tool_name: Option<String> = None;
367 let mut current_tool_input = String::new();
368
369 while let Some(event) = raw_stream.next().await {
370 match event {
371 RawClaudeEvent::System { session_id: sid, .. } => {
372 if let Some(id) = sid {
373 claude_session_id = Some(id);
374 }
375 }
376 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
377 if let Some(id) = sid {
378 claude_session_id = Some(id);
379 }
380 match inner {
381 StreamEventInner::ContentBlockDelta { delta, .. } => {
382 match delta {
383 StreamDelta::TextDelta { text } => {
384 response_content.push_str(&text);
385
386 let event = NodeEvent::ContentText { text: text.clone() };
388 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
389 current_parent = node_id;
390 }
391
392 yield ChatEvent::Content { text };
393 }
394 StreamDelta::InputJsonDelta { partial_json } => {
395 current_tool_input.push_str(&partial_json);
396 }
397 }
398 }
399 StreamEventInner::ContentBlockStart { content_block, .. } => {
400 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
401 current_tool_id = Some(id);
402 current_tool_name = Some(name);
403 current_tool_input.clear();
404 }
405 }
406 StreamEventInner::ContentBlockStop { .. } => {
407 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
409 let input: Value = serde_json::from_str(¤t_tool_input)
410 .unwrap_or(Value::Object(serde_json::Map::new()));
411
412 let event = NodeEvent::ContentToolUse {
414 id: id.clone(),
415 name: name.clone(),
416 input: input.clone(),
417 };
418 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
419 current_parent = node_id;
420 }
421
422 yield ChatEvent::ToolUse {
423 tool_name: name,
424 tool_use_id: id,
425 input,
426 };
427 current_tool_input.clear();
428 }
429 }
430 _ => {}
431 }
432 }
433 RawClaudeEvent::Assistant { message } => {
434 if let Some(msg) = message {
436 if let Some(content) = msg.content {
437 for block in content {
438 match block {
439 RawContentBlock::Text { text } => {
440 if response_content.is_empty() {
442 response_content.push_str(&text);
443
444 let event = NodeEvent::ContentText { text: text.clone() };
446 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
447 current_parent = node_id;
448 }
449
450 yield ChatEvent::Content { text };
451 }
452 }
453 RawContentBlock::ToolUse { id, name, input } => {
454 let event = NodeEvent::ContentToolUse {
456 id: id.clone(),
457 name: name.clone(),
458 input: input.clone(),
459 };
460 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
461 current_parent = node_id;
462 }
463
464 yield ChatEvent::ToolUse {
465 tool_name: name,
466 tool_use_id: id,
467 input,
468 };
469 }
470 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
471 let event = NodeEvent::UserToolResult {
473 tool_use_id: tool_use_id.clone(),
474 content: content.clone().unwrap_or_default(),
475 is_error: is_error.unwrap_or(false),
476 };
477 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
478 current_parent = node_id;
479 }
480
481 yield ChatEvent::ToolResult {
482 tool_use_id,
483 output: content.unwrap_or_default(),
484 is_error: is_error.unwrap_or(false),
485 };
486 }
487 RawContentBlock::Thinking { thinking, .. } => {
488 let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
490 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
491 current_parent = node_id;
492 }
493
494 yield ChatEvent::Thinking { thinking };
495 }
496 }
497 }
498 }
499 }
500 }
501 RawClaudeEvent::Result {
502 session_id: sid,
503 cost_usd: cost,
504 num_turns: turns,
505 is_error,
506 error,
507 ..
508 } => {
509 if let Some(id) = sid {
510 claude_session_id = Some(id);
511 }
512 cost_usd = cost;
513 num_turns = turns;
514
515 if is_error == Some(true) {
517 if let Some(err_msg) = error {
518 yield ChatEvent::Err { message: err_msg };
519 return;
520 }
521 }
522 }
523 RawClaudeEvent::Unknown { event_type, data } => {
524 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
526 Ok(handle) => {
527 tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
528 yield ChatEvent::Passthrough { event_type, handle, data };
529 }
530 Err(e) => {
531 tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
532 yield ChatEvent::Passthrough {
534 event_type,
535 handle: "storage-failed".to_string(),
536 data,
537 };
538 }
539 }
540 }
541 RawClaudeEvent::User { .. } => {
542 }
544 RawClaudeEvent::LaunchCommand { command } => {
545 let event = NodeEvent::LaunchCommand { command };
546 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
547 current_parent = node_id;
548 }
549 }
550 RawClaudeEvent::Stderr { text } => {
551 tracing::warn!(stderr = %text, "Claude stderr");
552 let event = NodeEvent::ClaudeStderr { text };
553 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
554 current_parent = node_id;
555 }
556 }
557 }
558 }
559
560 if let Some(ref new_id) = claude_session_id {
562 if prev_claude_session_id.as_deref() != Some(new_id.as_str()) {
563 let _ = storage.session_update_claude_id(&session_id, new_id.clone()).await;
564 }
565 }
566
567 if response_content.is_empty() && claude_session_id.is_none() {
569 yield ChatEvent::Err {
570 message: "Claude process produced no response. Check substrate logs for details.".to_string(),
571 };
572 return;
573 }
574
575 let model_id = format!("claude-code-{}", config.model.as_str());
577 let assistant_msg = if is_ephemeral {
578 match storage.message_create_ephemeral(
579 &session_id,
580 MessageRole::Assistant,
581 response_content,
582 Some(model_id),
583 None,
584 None,
585 cost_usd,
586 ).await {
587 Ok(m) => m,
588 Err(e) => {
589 yield ChatEvent::Err { message: e.to_string() };
590 return;
591 }
592 }
593 } else {
594 match storage.message_create(
595 &session_id,
596 MessageRole::Assistant,
597 response_content,
598 Some(model_id),
599 None,
600 None,
601 cost_usd,
602 ).await {
603 Ok(m) => m,
604 Err(e) => {
605 yield ChatEvent::Err { message: e.to_string() };
606 return;
607 }
608 }
609 };
610
611 let complete_event = NodeEvent::AssistantComplete { usage: None };
613 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &complete_event).await {
614 current_parent = node_id;
615 }
616
617 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
619 let assistant_node_id = if is_ephemeral {
620 match storage.arbor().node_create_external_ephemeral(
621 &config.head.tree_id,
622 Some(current_parent),
623 assistant_handle,
624 None,
625 ).await {
626 Ok(id) => id,
627 Err(e) => {
628 yield ChatEvent::Err { message: e.to_string() };
629 return;
630 }
631 }
632 } else {
633 match storage.arbor().node_create_external(
634 &config.head.tree_id,
635 Some(current_parent),
636 assistant_handle,
637 None,
638 ).await {
639 Ok(id) => id,
640 Err(e) => {
641 yield ChatEvent::Err { message: e.to_string() };
642 return;
643 }
644 }
645 };
646
647 let new_head = Position::new(config.head.tree_id, assistant_node_id);
648
649 if !is_ephemeral {
651 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
652 yield ChatEvent::Err { message: e.to_string() };
653 return;
654 }
655 }
656
657 yield ChatEvent::Complete {
660 new_head: if is_ephemeral { config.head } else { new_head },
661 claude_session_id: claude_session_id.unwrap_or_default(),
662 usage: Some(ChatUsage {
663 input_tokens: None,
664 output_tokens: None,
665 cost_usd,
666 num_turns,
667 }),
668 };
669 }
670 }
671
672 #[plexus_macros::method]
674 async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
675 let result = self.storage.session_get_by_name(&name).await;
676
677 stream! {
678 match result {
679 Ok(config) => {
680 yield GetResult::Ok { config };
681 }
682 Err(e) => {
683 yield GetResult::Err { message: e.to_string() };
684 }
685 }
686 }
687 }
688
689 #[plexus_macros::method]
691 async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
692 let storage = self.storage.clone();
693
694 stream! {
695 match storage.session_list().await {
696 Ok(sessions) => {
697 yield ListResult::Ok { sessions };
698 }
699 Err(e) => {
700 yield ListResult::Err { message: e.to_string() };
701 }
702 }
703 }
704 }
705
706 #[plexus_macros::method]
708 async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
709 let storage = self.storage.clone();
710 let resolve_result = storage.session_get_by_name(&name).await;
711
712 stream! {
713 let config = match resolve_result {
714 Ok(c) => c,
715 Err(e) => {
716 yield DeleteResult::Err { message: e.to_string() };
717 return;
718 }
719 };
720
721 match storage.session_delete(&config.id).await {
722 Ok(_) => {
723 yield DeleteResult::Ok { id: config.id };
724 }
725 Err(e) => {
726 yield DeleteResult::Err { message: e.to_string() };
727 }
728 }
729 }
730 }
731
732 #[plexus_macros::method]
734 async fn fork(
735 &self,
736 name: String,
737 new_name: String,
738 ) -> impl Stream<Item = ForkResult> + Send + 'static {
739 let storage = self.storage.clone();
740 let resolve_result = storage.session_get_by_name(&name).await;
741
742 stream! {
743 let parent = match resolve_result {
745 Ok(c) => c,
746 Err(e) => {
747 yield ForkResult::Err { message: e.to_string() };
748 return;
749 }
750 };
751
752 let new_config = match storage.session_create(
755 new_name,
756 parent.working_dir.clone(),
757 parent.model,
758 parent.system_prompt.clone(),
759 parent.mcp_config.clone(),
760 parent.loopback_enabled,
761 None, None, None, ).await {
765 Ok(mut c) => {
766 if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
769 yield ForkResult::Err { message: e.to_string() };
770 return;
771 }
772 c.head = parent.head;
773 c
774 }
775 Err(e) => {
776 yield ForkResult::Err { message: e.to_string() };
777 return;
778 }
779 };
780
781 yield ForkResult::Ok {
782 id: new_config.id,
783 head: new_config.head,
784 };
785 }
786 }
787
788 #[plexus_macros::method(params(
793 name = "Session name to chat with",
794 prompt = "User message / prompt to send",
795 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
796 ))]
797 async fn chat_async(
798 &self,
799 name: String,
800 prompt: String,
801 ephemeral: Option<bool>,
802 ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
803 let storage = self.storage.clone();
804 let executor = self.executor.clone();
805
806 let resolve_result = storage.session_get_by_name(&name).await;
808
809 stream! {
810 let is_ephemeral = ephemeral.unwrap_or(false);
811
812 let config = match resolve_result {
814 Ok(c) => c,
815 Err(e) => {
816 yield ChatStartResult::Err { message: e.to_string() };
817 return;
818 }
819 };
820
821 let session_id = config.id;
822
823 let stream_id = match storage.stream_create(session_id).await {
825 Ok(id) => id,
826 Err(e) => {
827 yield ChatStartResult::Err { message: e.to_string() };
828 return;
829 }
830 };
831
832 let storage_bg = storage.clone();
834 let executor_bg = executor.clone();
835 let prompt_bg = prompt.clone();
836 let config_bg = config.clone();
837 let stream_id_bg = stream_id;
838
839 tokio::spawn(async move {
840 Self::run_chat_background(
841 storage_bg,
842 executor_bg,
843 config_bg,
844 prompt_bg,
845 is_ephemeral,
846 stream_id_bg,
847 ).await;
848 }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
849
850 yield ChatStartResult::Ok {
852 stream_id,
853 session_id,
854 };
855 }
856 }
857
858 #[plexus_macros::method(params(
863 stream_id = "Stream ID returned from chat_async",
864 from_seq = "Optional: start reading from this sequence number",
865 limit = "Optional: max events to return (default 100)"
866 ))]
867 async fn poll(
868 &self,
869 stream_id: StreamId,
870 from_seq: Option<u64>,
871 limit: Option<u64>,
872 ) -> impl Stream<Item = PollResult> + Send + 'static {
873 let storage = self.storage.clone();
874
875 stream! {
876 let limit_usize = limit.map(|l| l as usize);
877
878 match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
879 Ok((info, events)) => {
880 let has_more = info.read_position < info.event_count;
881 yield PollResult::Ok {
882 status: info.status,
883 events,
884 read_position: info.read_position,
885 total_events: info.event_count,
886 has_more,
887 };
888 }
889 Err(e) => {
890 yield PollResult::Err { message: e.to_string() };
891 }
892 }
893 }
894 }
895
896 #[plexus_macros::method(params(
900 session_id = "Optional: filter by session ID"
901 ))]
902 async fn streams(
903 &self,
904 session_id: Option<ClaudeCodeId>,
905 ) -> impl Stream<Item = StreamListResult> + Send + 'static {
906 let storage = self.storage.clone();
907
908 stream! {
909 let streams = if let Some(sid) = session_id {
910 storage.stream_list_for_session(&sid).await
911 } else {
912 storage.stream_list().await
913 };
914
915 yield StreamListResult::Ok { streams };
916 }
917 }
918
919 #[plexus_macros::method(params(
921 name = "Session name"
922 ))]
923 async fn get_tree(
924 &self,
925 name: String,
926 ) -> impl Stream<Item = GetTreeResult> + Send + 'static {
927 let storage = self.storage.clone();
928
929 stream! {
930 let config = match storage.session_get_by_name(&name).await {
931 Ok(c) => c,
932 Err(e) => {
933 yield GetTreeResult::Err { message: e.to_string() };
934 return;
935 }
936 };
937
938 yield GetTreeResult::Ok {
939 tree_id: config.head.tree_id,
940 head: config.head.node_id,
941 };
942 }
943 }
944
945 #[plexus_macros::method(params(
947 name = "Session name",
948 start = "Optional start node (default: root)",
949 end = "Optional end node (default: head)"
950 ))]
951 async fn render_context(
952 &self,
953 name: String,
954 start: Option<NodeId>,
955 end: Option<NodeId>,
956 ) -> impl Stream<Item = RenderResult> + Send + 'static {
957 let storage = self.storage.clone();
958
959 stream! {
960 let config = match storage.session_get_by_name(&name).await {
962 Ok(c) => c,
963 Err(e) => {
964 yield RenderResult::Err { message: e.to_string() };
965 return;
966 }
967 };
968
969 let tree_id = config.head.tree_id;
970 let end_node = end.unwrap_or(config.head.node_id);
971
972 let start_node = if let Some(s) = start {
974 s
975 } else {
976 match storage.arbor().tree_get(&tree_id).await {
977 Ok(tree) => tree.root,
978 Err(e) => {
979 yield RenderResult::Err { message: e.to_string() };
980 return;
981 }
982 }
983 };
984
985 let messages = match storage.render_messages(&tree_id, &start_node, &end_node).await {
987 Ok(m) => m,
988 Err(e) => {
989 yield RenderResult::Err { message: e.to_string() };
990 return;
991 }
992 };
993
994 yield RenderResult::Ok { messages };
995 }
996 }
997
998 #[plexus_macros::method(params(
1000 project_path = "Project path (e.g., '-workspace-hypermemetic-hub-codegen')"
1001 ))]
1002 async fn sessions_list(
1003 &self,
1004 project_path: String,
1005 ) -> impl Stream<Item = SessionsListResult> + Send + 'static {
1006 stream! {
1007 match sessions::list_sessions(&project_path).await {
1008 Ok(sessions) => {
1009 yield SessionsListResult::Ok { sessions };
1010 }
1011 Err(e) => {
1012 yield SessionsListResult::Err { message: e };
1013 }
1014 }
1015 }
1016 }
1017
1018 #[plexus_macros::method(params(
1020 project_path = "Project path",
1021 session_id = "Session ID (UUID)"
1022 ))]
1023 async fn sessions_get(
1024 &self,
1025 project_path: String,
1026 session_id: String,
1027 ) -> impl Stream<Item = SessionsGetResult> + Send + 'static {
1028 stream! {
1029 match sessions::read_session(&project_path, &session_id).await {
1030 Ok(events) => {
1031 let event_count = events.len();
1032 let events_json: Vec<serde_json::Value> = events.into_iter()
1034 .filter_map(|e| serde_json::to_value(e).ok())
1035 .collect();
1036
1037 yield SessionsGetResult::Ok {
1038 session_id: session_id.clone(),
1039 event_count,
1040 events: events_json,
1041 };
1042 }
1043 Err(e) => {
1044 yield SessionsGetResult::Err { message: e };
1045 }
1046 }
1047 }
1048 }
1049
1050 #[plexus_macros::method(params(
1052 project_path = "Project path",
1053 session_id = "Session ID to import",
1054 owner_id = "Owner ID for the new tree (default: 'claudecode')"
1055 ))]
1056 async fn sessions_import(
1057 &self,
1058 project_path: String,
1059 session_id: String,
1060 owner_id: Option<String>,
1061 ) -> impl Stream<Item = SessionsImportResult> + Send + 'static {
1062 let storage = self.storage.clone();
1063
1064 stream! {
1065 let owner = owner_id.unwrap_or_else(|| "claudecode".to_string());
1066
1067 match sessions::import_to_arbor(storage.arbor(), &project_path, &session_id, &owner).await {
1068 Ok(tree_id) => {
1069 yield SessionsImportResult::Ok {
1070 tree_id,
1071 session_id,
1072 };
1073 }
1074 Err(e) => {
1075 yield SessionsImportResult::Err { message: e };
1076 }
1077 }
1078 }
1079 }
1080
1081 #[plexus_macros::method(params(
1083 tree_id = "Arbor tree ID to export",
1084 project_path = "Project path",
1085 session_id = "Session ID for the exported file"
1086 ))]
1087 async fn sessions_export(
1088 &self,
1089 tree_id: TreeId,
1090 project_path: String,
1091 session_id: String,
1092 ) -> impl Stream<Item = SessionsExportResult> + Send + 'static {
1093 let storage = self.storage.clone();
1094
1095 stream! {
1096 match sessions::export_from_arbor(storage.arbor(), &tree_id, &project_path, &session_id).await {
1097 Ok(()) => {
1098 yield SessionsExportResult::Ok {
1099 tree_id,
1100 session_id,
1101 };
1102 }
1103 Err(e) => {
1104 yield SessionsExportResult::Err { message: e };
1105 }
1106 }
1107 }
1108 }
1109
1110 #[plexus_macros::method(params(
1112 project_path = "Project path",
1113 session_id = "Session ID to delete"
1114 ))]
1115 async fn sessions_delete(
1116 &self,
1117 project_path: String,
1118 session_id: String,
1119 ) -> impl Stream<Item = SessionsDeleteResult> + Send + 'static {
1120 stream! {
1121 match sessions::delete_session(&project_path, &session_id).await {
1122 Ok(()) => {
1123 yield SessionsDeleteResult::Ok {
1124 session_id,
1125 deleted: true,
1126 };
1127 }
1128 Err(e) => {
1129 yield SessionsDeleteResult::Err { message: e };
1130 }
1131 }
1132 }
1133 }
1134}
1135
1136impl<P: HubContext> ClaudeCode<P> {
1138 async fn run_chat_background(
1140 storage: Arc<ClaudeCodeStorage>,
1141 executor: ClaudeCodeExecutor,
1142 config: ClaudeCodeConfig,
1143 prompt: String,
1144 is_ephemeral: bool,
1145 stream_id: StreamId,
1146 ) {
1147 let session_id = config.id;
1148
1149 let user_msg = if is_ephemeral {
1151 match storage.message_create_ephemeral(
1152 &session_id,
1153 MessageRole::User,
1154 prompt.clone(),
1155 None, None, None, None,
1156 ).await {
1157 Ok(m) => m,
1158 Err(e) => {
1159 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1160 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1161 }
1162 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1163 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1164 }
1165 return;
1166 }
1167 }
1168 } else {
1169 match storage.message_create(
1170 &session_id,
1171 MessageRole::User,
1172 prompt.clone(),
1173 None, None, None, None,
1174 ).await {
1175 Ok(m) => m,
1176 Err(e) => {
1177 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1178 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1179 }
1180 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1181 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1182 }
1183 return;
1184 }
1185 }
1186 };
1187
1188 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
1190 let user_node_id = if is_ephemeral {
1191 match storage.arbor().node_create_external_ephemeral(
1192 &config.head.tree_id,
1193 Some(config.head.node_id),
1194 user_handle,
1195 None,
1196 ).await {
1197 Ok(id) => id,
1198 Err(e) => {
1199 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1200 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1201 }
1202 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1203 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1204 }
1205 return;
1206 }
1207 }
1208 } else {
1209 match storage.arbor().node_create_external(
1210 &config.head.tree_id,
1211 Some(config.head.node_id),
1212 user_handle,
1213 None,
1214 ).await {
1215 Ok(id) => id,
1216 Err(e) => {
1217 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1218 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1219 }
1220 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1221 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1222 }
1223 return;
1224 }
1225 }
1226 };
1227
1228 let user_position = Position::new(config.head.tree_id, user_node_id);
1229
1230 let mut current_parent = user_node_id;
1232
1233 let user_event = NodeEvent::UserMessage { content: prompt.clone() };
1235 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &user_event).await {
1236 current_parent = node_id;
1237 }
1238
1239 let start_event = NodeEvent::AssistantStart;
1241 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &start_event).await {
1242 current_parent = node_id;
1243 }
1244
1245 if let Err(e) = storage.stream_set_user_position(&stream_id, user_position).await {
1247 tracing::error!(stream_id = %stream_id, error = %e, "Failed to set user position on stream");
1248 }
1249
1250 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Start {
1252 id: session_id,
1253 user_position,
1254 }).await {
1255 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1256 }
1257
1258 let launch_config = LaunchConfig {
1260 query: prompt,
1261 session_id: config.claude_session_id.clone(),
1262 fork_session: false,
1263 model: config.model,
1264 working_dir: config.working_dir.clone(),
1265 system_prompt: config.system_prompt.clone(),
1266 mcp_config: config.mcp_config.clone(),
1267 loopback_enabled: config.loopback_enabled,
1268 loopback_session_id: if config.loopback_enabled {
1270 config.claude_session_id.clone()
1271 } else {
1272 None
1273 },
1274 ..Default::default()
1275 };
1276
1277 let mut response_content = String::new();
1279 let mut claude_session_id = config.claude_session_id.clone();
1280 let mut cost_usd = None;
1281 let mut num_turns = None;
1282
1283 let mut raw_stream = executor.launch(launch_config).await;
1284
1285 let mut current_tool_id: Option<String> = None;
1287 let mut current_tool_name: Option<String> = None;
1288 let mut current_tool_input = String::new();
1289
1290 while let Some(event) = raw_stream.next().await {
1291 match event {
1292 RawClaudeEvent::System { session_id: sid, .. } => {
1293 if let Some(id) = sid {
1294 claude_session_id = Some(id);
1295 }
1296 }
1297 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
1298 if let Some(id) = sid {
1299 claude_session_id = Some(id);
1300 }
1301 match inner {
1302 StreamEventInner::ContentBlockDelta { delta, .. } => {
1303 match delta {
1304 StreamDelta::TextDelta { text } => {
1305 response_content.push_str(&text);
1306
1307 let event = NodeEvent::ContentText { text: text.clone() };
1309 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1310 current_parent = node_id;
1311 }
1312
1313 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await {
1314 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1315 }
1316 }
1317 StreamDelta::InputJsonDelta { partial_json } => {
1318 current_tool_input.push_str(&partial_json);
1319 }
1320 }
1321 }
1322 StreamEventInner::ContentBlockStart { content_block, .. } => {
1323 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
1324 current_tool_id = Some(id);
1325 current_tool_name = Some(name);
1326 current_tool_input.clear();
1327 }
1328 }
1329 StreamEventInner::ContentBlockStop { .. } => {
1330 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
1331 let input: Value = serde_json::from_str(¤t_tool_input)
1332 .unwrap_or(Value::Object(serde_json::Map::new()));
1333
1334 if name == "mcp__plexus__loopback_permit" {
1336 if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await {
1337 tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1338 }
1339 }
1340
1341 let event = NodeEvent::ContentToolUse {
1343 id: id.clone(),
1344 name: name.clone(),
1345 input: input.clone(),
1346 };
1347 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1348 current_parent = node_id;
1349 }
1350
1351 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1352 tool_name: name,
1353 tool_use_id: id,
1354 input,
1355 }).await {
1356 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1357 }
1358 current_tool_input.clear();
1359 }
1360 }
1361 StreamEventInner::MessageDelta { delta } => {
1362 if delta.stop_reason == Some("tool_use".to_string()) {
1364 }
1366 }
1367 _ => {}
1368 }
1369 }
1370 RawClaudeEvent::Assistant { message } => {
1371 if let Some(msg) = message {
1372 if let Some(content) = msg.content {
1373 for block in content {
1374 match block {
1375 RawContentBlock::Text { text } => {
1376 if response_content.is_empty() {
1377 response_content.push_str(&text);
1378
1379 let event = NodeEvent::ContentText { text: text.clone() };
1381 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1382 current_parent = node_id;
1383 }
1384
1385 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await {
1386 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1387 }
1388 }
1389 }
1390 RawContentBlock::ToolUse { id, name, input } => {
1391 let event = NodeEvent::ContentToolUse {
1393 id: id.clone(),
1394 name: name.clone(),
1395 input: input.clone(),
1396 };
1397 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1398 current_parent = node_id;
1399 }
1400
1401 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1402 tool_name: name,
1403 tool_use_id: id,
1404 input,
1405 }).await {
1406 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1407 }
1408 }
1409 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
1410 if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await {
1412 tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1413 }
1414
1415 let event = NodeEvent::UserToolResult {
1417 tool_use_id: tool_use_id.clone(),
1418 content: content.clone().unwrap_or_default(),
1419 is_error: is_error.unwrap_or(false),
1420 };
1421 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1422 current_parent = node_id;
1423 }
1424
1425 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
1426 tool_use_id,
1427 output: content.unwrap_or_default(),
1428 is_error: is_error.unwrap_or(false),
1429 }).await {
1430 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1431 }
1432 }
1433 RawContentBlock::Thinking { thinking, .. } => {
1434 let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
1436 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1437 current_parent = node_id;
1438 }
1439
1440 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await {
1441 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1442 }
1443 }
1444 }
1445 }
1446 }
1447 }
1448 }
1449 RawClaudeEvent::Result {
1450 session_id: sid,
1451 cost_usd: cost,
1452 num_turns: turns,
1453 is_error,
1454 error,
1455 ..
1456 } => {
1457 if let Some(id) = sid {
1458 claude_session_id = Some(id);
1459 }
1460 cost_usd = cost;
1461 num_turns = turns;
1462
1463 if is_error == Some(true) {
1464 if let Some(err_msg) = error {
1465 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await {
1466 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push error event to stream");
1467 }
1468 if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await {
1469 tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status to Failed");
1470 }
1471 return;
1472 }
1473 }
1474 }
1475 RawClaudeEvent::Unknown { event_type, data } => {
1476 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1477 Ok(handle) => {
1478 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await {
1479 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1480 }
1481 }
1482 Err(_) => {
1483 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1484 event_type,
1485 handle: "storage-failed".to_string(),
1486 data,
1487 }).await {
1488 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1489 }
1490 }
1491 }
1492 }
1493 RawClaudeEvent::User { .. } => {}
1494 RawClaudeEvent::LaunchCommand { command } => {
1495 let event = NodeEvent::LaunchCommand { command };
1496 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1497 current_parent = node_id;
1498 }
1499 }
1500 RawClaudeEvent::Stderr { text } => {
1501 tracing::warn!(stderr = %text, "Claude stderr");
1502 let event = NodeEvent::ClaudeStderr { text };
1503 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1504 current_parent = node_id;
1505 }
1506 }
1507 }
1508 }
1509
1510 let model_id = format!("claude-code-{}", config.model.as_str());
1512 let assistant_msg = if is_ephemeral {
1513 match storage.message_create_ephemeral(
1514 &session_id,
1515 MessageRole::Assistant,
1516 response_content,
1517 Some(model_id),
1518 None,
1519 None,
1520 cost_usd,
1521 ).await {
1522 Ok(m) => m,
1523 Err(e) => {
1524 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1525 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1526 }
1527 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1528 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1529 }
1530 return;
1531 }
1532 }
1533 } else {
1534 match storage.message_create(
1535 &session_id,
1536 MessageRole::Assistant,
1537 response_content,
1538 Some(model_id),
1539 None,
1540 None,
1541 cost_usd,
1542 ).await {
1543 Ok(m) => m,
1544 Err(e) => {
1545 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1546 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1547 }
1548 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1549 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1550 }
1551 return;
1552 }
1553 }
1554 };
1555
1556 let complete_event = NodeEvent::AssistantComplete { usage: None };
1558 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &complete_event).await {
1559 current_parent = node_id;
1560 }
1561
1562 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1564 let assistant_node_id = if is_ephemeral {
1565 match storage.arbor().node_create_external_ephemeral(
1566 &config.head.tree_id,
1567 Some(current_parent),
1568 assistant_handle,
1569 None,
1570 ).await {
1571 Ok(id) => id,
1572 Err(e) => {
1573 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1574 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1575 }
1576 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1577 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1578 }
1579 return;
1580 }
1581 }
1582 } else {
1583 match storage.arbor().node_create_external(
1584 &config.head.tree_id,
1585 Some(current_parent),
1586 assistant_handle,
1587 None,
1588 ).await {
1589 Ok(id) => id,
1590 Err(e) => {
1591 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1592 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1593 }
1594 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1595 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1596 }
1597 return;
1598 }
1599 }
1600 };
1601
1602 let new_head = Position::new(config.head.tree_id, assistant_node_id);
1603
1604 if !is_ephemeral {
1606 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1607 if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1608 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1609 }
1610 if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1611 tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1612 }
1613 return;
1614 }
1615 }
1616
1617 if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1619 new_head: if is_ephemeral { config.head } else { new_head },
1620 claude_session_id: claude_session_id.unwrap_or_default(),
1621 usage: Some(ChatUsage {
1622 input_tokens: None,
1623 output_tokens: None,
1624 cost_usd,
1625 num_turns,
1626 }),
1627 }).await {
1628 tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1629 }
1630
1631 if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await {
1632 tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1633 }
1634 }
1635}