1use super::{
2 executor::{ClaudeCodeExecutor, LaunchConfig},
3 storage::ClaudeCodeStorage,
4 types::*,
5};
6use crate::plexus::{HubContext, NoParent};
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use plexus_macros::hub_methods;
10use serde_json::Value;
11use std::marker::PhantomData;
12use std::sync::{Arc, OnceLock};
13use tracing::Instrument;
14
15#[derive(Clone)]
22pub struct ClaudeCode<P: HubContext = NoParent> {
23 storage: Arc<ClaudeCodeStorage>,
24 executor: ClaudeCodeExecutor,
25 hub: Arc<OnceLock<P>>,
27 _phantom: PhantomData<P>,
28}
29
30impl<P: HubContext> ClaudeCode<P> {
31 pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
33 Self {
34 storage,
35 executor: ClaudeCodeExecutor::new(),
36 hub: Arc::new(OnceLock::new()),
37 _phantom: PhantomData,
38 }
39 }
40
41 pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
43 Self {
44 storage,
45 executor,
46 hub: Arc::new(OnceLock::new()),
47 _phantom: PhantomData,
48 }
49 }
50
51 pub fn inject_parent(&self, parent: P) {
56 let _ = self.hub.set(parent);
57 }
58
59 pub fn has_parent(&self) -> bool {
61 self.hub.get().is_some()
62 }
63
64 pub fn parent(&self) -> Option<&P> {
68 self.hub.get()
69 }
70
71 pub async fn resolve_handle_impl(
76 &self,
77 handle: &crate::types::Handle,
78 ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
79 use crate::plexus::{PlexusError, wrap_stream};
80 use async_stream::stream;
81
82 let storage = self.storage.clone();
83
84 if handle.meta.is_empty() {
87 return Err(PlexusError::ExecutionError(
88 "ClaudeCode handle missing message ID in meta".to_string()
89 ));
90 }
91 let identifier = handle.meta.join(":");
92
93 let name = handle.meta.get(2).cloned();
95
96 let result_stream = stream! {
97 match storage.resolve_message_handle(&identifier).await {
98 Ok(message) => {
99 yield ResolveResult::Message {
100 id: message.id.to_string(),
101 role: message.role.as_str().to_string(),
102 content: message.content,
103 model: message.model_id,
104 name: name.unwrap_or_else(|| message.role.as_str().to_string()),
105 };
106 }
107 Err(e) => {
108 yield ResolveResult::Error {
109 message: format!("Failed to resolve handle: {}", e.message),
110 };
111 }
112 }
113 };
114
115 Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
116 }
117}
118
119impl ClaudeCode<NoParent> {
121 pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
122 Self::with_context_type(storage)
123 }
124
125 pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
126 Self::with_executor_and_context(storage, executor)
127 }
128}
129
130#[hub_methods(
131 namespace = "claudecode",
132 version = "1.0.0",
133 description = "Manage Claude Code sessions with Arbor-backed conversation history",
134 resolve_handle
135)]
136impl<P: HubContext> ClaudeCode<P> {
137 #[plexus_macros::hub_method(params(
139 name = "Human-readable name for the session",
140 working_dir = "Working directory for Claude Code",
141 model = "Model to use (opus, sonnet, haiku)",
142 system_prompt = "Optional system prompt / instructions",
143 loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval"
144 ))]
145 async fn create(
146 &self,
147 name: String,
148 working_dir: String,
149 model: Model,
150 system_prompt: Option<String>,
151 loopback_enabled: Option<bool>,
152 ) -> impl Stream<Item = CreateResult> + Send + 'static {
153 let storage = self.storage.clone();
154 let loopback = loopback_enabled.unwrap_or(false);
155
156 stream! {
157 match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None).await {
158 Ok(config) => {
159 yield CreateResult::Ok {
160 id: config.id,
161 head: config.head,
162 };
163 }
164 Err(e) => {
165 yield CreateResult::Err { message: e.to_string() };
166 }
167 }
168 }
169 }
170
171 #[plexus_macros::hub_method(
173 streaming,
174 params(
175 name = "Session name to chat with",
176 prompt = "User message / prompt to send",
177 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
178 )
179 )]
180 async fn chat(
181 &self,
182 name: String,
183 prompt: String,
184 ephemeral: Option<bool>,
185 ) -> impl Stream<Item = ChatEvent> + Send + 'static {
186 let storage = self.storage.clone();
187 let executor = self.executor.clone();
188
189 let resolve_result = storage.session_get_by_name(&name).await;
191
192 stream! {
193 let is_ephemeral = ephemeral.unwrap_or(false);
194
195 let config = match resolve_result {
197 Ok(c) => c,
198 Err(e) => {
199 yield ChatEvent::Err { message: e.to_string() };
200 return;
201 }
202 };
203
204 let session_id = config.id;
205
206 let user_msg = if is_ephemeral {
208 match storage.message_create_ephemeral(
209 &session_id,
210 MessageRole::User,
211 prompt.clone(),
212 None, None, None, None,
213 ).await {
214 Ok(m) => m,
215 Err(e) => {
216 yield ChatEvent::Err { message: e.to_string() };
217 return;
218 }
219 }
220 } else {
221 match storage.message_create(
222 &session_id,
223 MessageRole::User,
224 prompt.clone(),
225 None, None, None, None,
226 ).await {
227 Ok(m) => m,
228 Err(e) => {
229 yield ChatEvent::Err { message: e.to_string() };
230 return;
231 }
232 }
233 };
234
235 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
237 let user_node_id = if is_ephemeral {
238 match storage.arbor().node_create_external_ephemeral(
239 &config.head.tree_id,
240 Some(config.head.node_id),
241 user_handle,
242 None,
243 ).await {
244 Ok(id) => id,
245 Err(e) => {
246 yield ChatEvent::Err { message: e.to_string() };
247 return;
248 }
249 }
250 } else {
251 match storage.arbor().node_create_external(
252 &config.head.tree_id,
253 Some(config.head.node_id),
254 user_handle,
255 None,
256 ).await {
257 Ok(id) => id,
258 Err(e) => {
259 yield ChatEvent::Err { message: e.to_string() };
260 return;
261 }
262 }
263 };
264
265 let user_position = Position::new(config.head.tree_id, user_node_id);
266
267 yield ChatEvent::Start {
269 id: session_id,
270 user_position,
271 };
272
273 let launch_config = LaunchConfig {
275 query: prompt,
276 session_id: config.claude_session_id.clone(),
277 fork_session: false,
278 model: config.model,
279 working_dir: config.working_dir.clone(),
280 system_prompt: config.system_prompt.clone(),
281 mcp_config: config.mcp_config.clone(),
282 loopback_enabled: config.loopback_enabled,
283 loopback_session_id: if config.loopback_enabled {
284 Some(session_id.to_string())
285 } else {
286 None
287 },
288 ..Default::default()
289 };
290
291 let mut response_content = String::new();
293 let mut claude_session_id = config.claude_session_id.clone();
294 let mut cost_usd = None;
295 let mut num_turns = None;
296
297 let mut raw_stream = executor.launch(launch_config).await;
298
299 let mut current_tool_id: Option<String> = None;
301 let mut current_tool_name: Option<String> = None;
302 let mut current_tool_input = String::new();
303
304 while let Some(event) = raw_stream.next().await {
305 match event {
306 RawClaudeEvent::System { session_id: sid, .. } => {
307 if let Some(id) = sid {
308 claude_session_id = Some(id);
309 }
310 }
311 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
312 if let Some(id) = sid {
313 claude_session_id = Some(id);
314 }
315 match inner {
316 StreamEventInner::ContentBlockDelta { delta, .. } => {
317 match delta {
318 StreamDelta::TextDelta { text } => {
319 response_content.push_str(&text);
320 yield ChatEvent::Content { text };
321 }
322 StreamDelta::InputJsonDelta { partial_json } => {
323 current_tool_input.push_str(&partial_json);
324 }
325 }
326 }
327 StreamEventInner::ContentBlockStart { content_block, .. } => {
328 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
329 current_tool_id = Some(id);
330 current_tool_name = Some(name);
331 current_tool_input.clear();
332 }
333 }
334 StreamEventInner::ContentBlockStop { .. } => {
335 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
337 let input: Value = serde_json::from_str(¤t_tool_input)
338 .unwrap_or(Value::Object(serde_json::Map::new()));
339 yield ChatEvent::ToolUse {
340 tool_name: name,
341 tool_use_id: id,
342 input,
343 };
344 current_tool_input.clear();
345 }
346 }
347 _ => {}
348 }
349 }
350 RawClaudeEvent::Assistant { message } => {
351 if let Some(msg) = message {
353 if let Some(content) = msg.content {
354 for block in content {
355 match block {
356 RawContentBlock::Text { text } => {
357 if response_content.is_empty() {
359 response_content.push_str(&text);
360 yield ChatEvent::Content { text };
361 }
362 }
363 RawContentBlock::ToolUse { id, name, input } => {
364 yield ChatEvent::ToolUse {
365 tool_name: name,
366 tool_use_id: id,
367 input,
368 };
369 }
370 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
371 yield ChatEvent::ToolResult {
372 tool_use_id,
373 output: content.unwrap_or_default(),
374 is_error: is_error.unwrap_or(false),
375 };
376 }
377 RawContentBlock::Thinking { thinking, .. } => {
378 yield ChatEvent::Thinking { thinking };
379 }
380 }
381 }
382 }
383 }
384 }
385 RawClaudeEvent::Result {
386 session_id: sid,
387 cost_usd: cost,
388 num_turns: turns,
389 is_error,
390 error,
391 ..
392 } => {
393 if let Some(id) = sid {
394 claude_session_id = Some(id);
395 }
396 cost_usd = cost;
397 num_turns = turns;
398
399 if is_error == Some(true) {
401 if let Some(err_msg) = error {
402 yield ChatEvent::Err { message: err_msg };
403 return;
404 }
405 }
406 }
407 RawClaudeEvent::Unknown { event_type, data } => {
408 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
410 Ok(handle) => {
411 tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
412 yield ChatEvent::Passthrough { event_type, handle, data };
413 }
414 Err(e) => {
415 tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
416 yield ChatEvent::Passthrough {
418 event_type,
419 handle: "storage-failed".to_string(),
420 data,
421 };
422 }
423 }
424 }
425 RawClaudeEvent::User { .. } => {
426 }
428 }
429 }
430
431 let model_id = format!("claude-code-{}", config.model.as_str());
433 let assistant_msg = if is_ephemeral {
434 match storage.message_create_ephemeral(
435 &session_id,
436 MessageRole::Assistant,
437 response_content,
438 Some(model_id),
439 None,
440 None,
441 cost_usd,
442 ).await {
443 Ok(m) => m,
444 Err(e) => {
445 yield ChatEvent::Err { message: e.to_string() };
446 return;
447 }
448 }
449 } else {
450 match storage.message_create(
451 &session_id,
452 MessageRole::Assistant,
453 response_content,
454 Some(model_id),
455 None,
456 None,
457 cost_usd,
458 ).await {
459 Ok(m) => m,
460 Err(e) => {
461 yield ChatEvent::Err { message: e.to_string() };
462 return;
463 }
464 }
465 };
466
467 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
469 let assistant_node_id = if is_ephemeral {
470 match storage.arbor().node_create_external_ephemeral(
471 &config.head.tree_id,
472 Some(user_node_id),
473 assistant_handle,
474 None,
475 ).await {
476 Ok(id) => id,
477 Err(e) => {
478 yield ChatEvent::Err { message: e.to_string() };
479 return;
480 }
481 }
482 } else {
483 match storage.arbor().node_create_external(
484 &config.head.tree_id,
485 Some(user_node_id),
486 assistant_handle,
487 None,
488 ).await {
489 Ok(id) => id,
490 Err(e) => {
491 yield ChatEvent::Err { message: e.to_string() };
492 return;
493 }
494 }
495 };
496
497 let new_head = Position::new(config.head.tree_id, assistant_node_id);
498
499 if !is_ephemeral {
501 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
502 yield ChatEvent::Err { message: e.to_string() };
503 return;
504 }
505 }
506
507 yield ChatEvent::Complete {
510 new_head: if is_ephemeral { config.head } else { new_head },
511 claude_session_id: claude_session_id.unwrap_or_default(),
512 usage: Some(ChatUsage {
513 input_tokens: None,
514 output_tokens: None,
515 cost_usd,
516 num_turns,
517 }),
518 };
519 }
520 }
521
522 #[plexus_macros::hub_method]
524 async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
525 let result = self.storage.session_get_by_name(&name).await;
526
527 stream! {
528 match result {
529 Ok(config) => {
530 yield GetResult::Ok { config };
531 }
532 Err(e) => {
533 yield GetResult::Err { message: e.to_string() };
534 }
535 }
536 }
537 }
538
539 #[plexus_macros::hub_method]
541 async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
542 let storage = self.storage.clone();
543
544 stream! {
545 match storage.session_list().await {
546 Ok(sessions) => {
547 yield ListResult::Ok { sessions };
548 }
549 Err(e) => {
550 yield ListResult::Err { message: e.to_string() };
551 }
552 }
553 }
554 }
555
556 #[plexus_macros::hub_method]
558 async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
559 let storage = self.storage.clone();
560 let resolve_result = storage.session_get_by_name(&name).await;
561
562 stream! {
563 let config = match resolve_result {
564 Ok(c) => c,
565 Err(e) => {
566 yield DeleteResult::Err { message: e.to_string() };
567 return;
568 }
569 };
570
571 match storage.session_delete(&config.id).await {
572 Ok(_) => {
573 yield DeleteResult::Ok { id: config.id };
574 }
575 Err(e) => {
576 yield DeleteResult::Err { message: e.to_string() };
577 }
578 }
579 }
580 }
581
582 #[plexus_macros::hub_method]
584 async fn fork(
585 &self,
586 name: String,
587 new_name: String,
588 ) -> impl Stream<Item = ForkResult> + Send + 'static {
589 let storage = self.storage.clone();
590 let resolve_result = storage.session_get_by_name(&name).await;
591
592 stream! {
593 let parent = match resolve_result {
595 Ok(c) => c,
596 Err(e) => {
597 yield ForkResult::Err { message: e.to_string() };
598 return;
599 }
600 };
601
602 let new_config = match storage.session_create(
605 new_name,
606 parent.working_dir.clone(),
607 parent.model,
608 parent.system_prompt.clone(),
609 parent.mcp_config.clone(),
610 parent.loopback_enabled,
611 None,
612 ).await {
613 Ok(mut c) => {
614 if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
617 yield ForkResult::Err { message: e.to_string() };
618 return;
619 }
620 c.head = parent.head;
621 c
622 }
623 Err(e) => {
624 yield ForkResult::Err { message: e.to_string() };
625 return;
626 }
627 };
628
629 yield ForkResult::Ok {
630 id: new_config.id,
631 head: new_config.head,
632 };
633 }
634 }
635
636 #[plexus_macros::hub_method(
641 params(
642 name = "Session name to chat with",
643 prompt = "User message / prompt to send",
644 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
645 )
646 )]
647 async fn chat_async(
648 &self,
649 name: String,
650 prompt: String,
651 ephemeral: Option<bool>,
652 ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
653 let storage = self.storage.clone();
654 let executor = self.executor.clone();
655
656 let resolve_result = storage.session_get_by_name(&name).await;
658
659 stream! {
660 let is_ephemeral = ephemeral.unwrap_or(false);
661
662 let config = match resolve_result {
664 Ok(c) => c,
665 Err(e) => {
666 yield ChatStartResult::Err { message: e.to_string() };
667 return;
668 }
669 };
670
671 let session_id = config.id;
672
673 let stream_id = match storage.stream_create(session_id).await {
675 Ok(id) => id,
676 Err(e) => {
677 yield ChatStartResult::Err { message: e.to_string() };
678 return;
679 }
680 };
681
682 let storage_bg = storage.clone();
684 let executor_bg = executor.clone();
685 let prompt_bg = prompt.clone();
686 let config_bg = config.clone();
687 let stream_id_bg = stream_id;
688
689 tokio::spawn(async move {
690 Self::run_chat_background(
691 storage_bg,
692 executor_bg,
693 config_bg,
694 prompt_bg,
695 is_ephemeral,
696 stream_id_bg,
697 ).await;
698 }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
699
700 yield ChatStartResult::Ok {
702 stream_id,
703 session_id,
704 };
705 }
706 }
707
708 #[plexus_macros::hub_method(
713 params(
714 stream_id = "Stream ID returned from chat_async",
715 from_seq = "Optional: start reading from this sequence number",
716 limit = "Optional: max events to return (default 100)"
717 )
718 )]
719 async fn poll(
720 &self,
721 stream_id: StreamId,
722 from_seq: Option<u64>,
723 limit: Option<u64>,
724 ) -> impl Stream<Item = PollResult> + Send + 'static {
725 let storage = self.storage.clone();
726
727 stream! {
728 let limit_usize = limit.map(|l| l as usize);
729
730 match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
731 Ok((info, events)) => {
732 let has_more = info.read_position < info.event_count;
733 yield PollResult::Ok {
734 status: info.status,
735 events,
736 read_position: info.read_position,
737 total_events: info.event_count,
738 has_more,
739 };
740 }
741 Err(e) => {
742 yield PollResult::Err { message: e.to_string() };
743 }
744 }
745 }
746 }
747
748 #[plexus_macros::hub_method(
752 params(
753 session_id = "Optional: filter by session ID"
754 )
755 )]
756 async fn streams(
757 &self,
758 session_id: Option<ClaudeCodeId>,
759 ) -> impl Stream<Item = StreamListResult> + Send + 'static {
760 let storage = self.storage.clone();
761
762 stream! {
763 let streams = if let Some(sid) = session_id {
764 storage.stream_list_for_session(&sid).await
765 } else {
766 storage.stream_list().await
767 };
768
769 yield StreamListResult::Ok { streams };
770 }
771 }
772}
773
774impl<P: HubContext> ClaudeCode<P> {
776 async fn run_chat_background(
778 storage: Arc<ClaudeCodeStorage>,
779 executor: ClaudeCodeExecutor,
780 config: ClaudeCodeConfig,
781 prompt: String,
782 is_ephemeral: bool,
783 stream_id: StreamId,
784 ) {
785 let session_id = config.id;
786
787 let user_msg = if is_ephemeral {
789 match storage.message_create_ephemeral(
790 &session_id,
791 MessageRole::User,
792 prompt.clone(),
793 None, None, None, None,
794 ).await {
795 Ok(m) => m,
796 Err(e) => {
797 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
798 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
799 return;
800 }
801 }
802 } else {
803 match storage.message_create(
804 &session_id,
805 MessageRole::User,
806 prompt.clone(),
807 None, None, None, None,
808 ).await {
809 Ok(m) => m,
810 Err(e) => {
811 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
812 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
813 return;
814 }
815 }
816 };
817
818 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
820 let user_node_id = if is_ephemeral {
821 match storage.arbor().node_create_external_ephemeral(
822 &config.head.tree_id,
823 Some(config.head.node_id),
824 user_handle,
825 None,
826 ).await {
827 Ok(id) => id,
828 Err(e) => {
829 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
830 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
831 return;
832 }
833 }
834 } else {
835 match storage.arbor().node_create_external(
836 &config.head.tree_id,
837 Some(config.head.node_id),
838 user_handle,
839 None,
840 ).await {
841 Ok(id) => id,
842 Err(e) => {
843 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
844 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
845 return;
846 }
847 }
848 };
849
850 let user_position = Position::new(config.head.tree_id, user_node_id);
851
852 let _ = storage.stream_set_user_position(&stream_id, user_position).await;
854
855 let _ = storage.stream_push_event(&stream_id, ChatEvent::Start {
857 id: session_id,
858 user_position,
859 }).await;
860
861 let launch_config = LaunchConfig {
863 query: prompt,
864 session_id: config.claude_session_id.clone(),
865 fork_session: false,
866 model: config.model,
867 working_dir: config.working_dir.clone(),
868 system_prompt: config.system_prompt.clone(),
869 mcp_config: config.mcp_config.clone(),
870 loopback_enabled: config.loopback_enabled,
871 loopback_session_id: if config.loopback_enabled {
872 Some(session_id.to_string())
873 } else {
874 None
875 },
876 ..Default::default()
877 };
878
879 let mut response_content = String::new();
881 let mut claude_session_id = config.claude_session_id.clone();
882 let mut cost_usd = None;
883 let mut num_turns = None;
884
885 let mut raw_stream = executor.launch(launch_config).await;
886
887 let mut current_tool_id: Option<String> = None;
889 let mut current_tool_name: Option<String> = None;
890 let mut current_tool_input = String::new();
891
892 while let Some(event) = raw_stream.next().await {
893 match event {
894 RawClaudeEvent::System { session_id: sid, .. } => {
895 if let Some(id) = sid {
896 claude_session_id = Some(id);
897 }
898 }
899 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
900 if let Some(id) = sid {
901 claude_session_id = Some(id);
902 }
903 match inner {
904 StreamEventInner::ContentBlockDelta { delta, .. } => {
905 match delta {
906 StreamDelta::TextDelta { text } => {
907 response_content.push_str(&text);
908 let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
909 }
910 StreamDelta::InputJsonDelta { partial_json } => {
911 current_tool_input.push_str(&partial_json);
912 }
913 }
914 }
915 StreamEventInner::ContentBlockStart { content_block, .. } => {
916 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
917 current_tool_id = Some(id);
918 current_tool_name = Some(name);
919 current_tool_input.clear();
920 }
921 }
922 StreamEventInner::ContentBlockStop { .. } => {
923 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
924 let input: Value = serde_json::from_str(¤t_tool_input)
925 .unwrap_or(Value::Object(serde_json::Map::new()));
926
927 if name == "mcp__plexus__loopback_permit" {
929 let _ = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await;
930 }
931
932 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
933 tool_name: name,
934 tool_use_id: id,
935 input,
936 }).await;
937 current_tool_input.clear();
938 }
939 }
940 StreamEventInner::MessageDelta { delta } => {
941 if delta.stop_reason == Some("tool_use".to_string()) {
943 }
945 }
946 _ => {}
947 }
948 }
949 RawClaudeEvent::Assistant { message } => {
950 if let Some(msg) = message {
951 if let Some(content) = msg.content {
952 for block in content {
953 match block {
954 RawContentBlock::Text { text } => {
955 if response_content.is_empty() {
956 response_content.push_str(&text);
957 let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
958 }
959 }
960 RawContentBlock::ToolUse { id, name, input } => {
961 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
962 tool_name: name,
963 tool_use_id: id,
964 input,
965 }).await;
966 }
967 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
968 let _ = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await;
970 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
971 tool_use_id,
972 output: content.unwrap_or_default(),
973 is_error: is_error.unwrap_or(false),
974 }).await;
975 }
976 RawContentBlock::Thinking { thinking, .. } => {
977 let _ = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await;
978 }
979 }
980 }
981 }
982 }
983 }
984 RawClaudeEvent::Result {
985 session_id: sid,
986 cost_usd: cost,
987 num_turns: turns,
988 is_error,
989 error,
990 ..
991 } => {
992 if let Some(id) = sid {
993 claude_session_id = Some(id);
994 }
995 cost_usd = cost;
996 num_turns = turns;
997
998 if is_error == Some(true) {
999 if let Some(err_msg) = error {
1000 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await;
1001 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await;
1002 return;
1003 }
1004 }
1005 }
1006 RawClaudeEvent::Unknown { event_type, data } => {
1007 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1008 Ok(handle) => {
1009 let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await;
1010 }
1011 Err(_) => {
1012 let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1013 event_type,
1014 handle: "storage-failed".to_string(),
1015 data,
1016 }).await;
1017 }
1018 }
1019 }
1020 RawClaudeEvent::User { .. } => {}
1021 }
1022 }
1023
1024 let model_id = format!("claude-code-{}", config.model.as_str());
1026 let assistant_msg = if is_ephemeral {
1027 match storage.message_create_ephemeral(
1028 &session_id,
1029 MessageRole::Assistant,
1030 response_content,
1031 Some(model_id),
1032 None,
1033 None,
1034 cost_usd,
1035 ).await {
1036 Ok(m) => m,
1037 Err(e) => {
1038 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1039 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1040 return;
1041 }
1042 }
1043 } else {
1044 match storage.message_create(
1045 &session_id,
1046 MessageRole::Assistant,
1047 response_content,
1048 Some(model_id),
1049 None,
1050 None,
1051 cost_usd,
1052 ).await {
1053 Ok(m) => m,
1054 Err(e) => {
1055 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1056 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1057 return;
1058 }
1059 }
1060 };
1061
1062 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1064 let assistant_node_id = if is_ephemeral {
1065 match storage.arbor().node_create_external_ephemeral(
1066 &config.head.tree_id,
1067 Some(user_node_id),
1068 assistant_handle,
1069 None,
1070 ).await {
1071 Ok(id) => id,
1072 Err(e) => {
1073 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1074 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1075 return;
1076 }
1077 }
1078 } else {
1079 match storage.arbor().node_create_external(
1080 &config.head.tree_id,
1081 Some(user_node_id),
1082 assistant_handle,
1083 None,
1084 ).await {
1085 Ok(id) => id,
1086 Err(e) => {
1087 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1088 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1089 return;
1090 }
1091 }
1092 };
1093
1094 let new_head = Position::new(config.head.tree_id, assistant_node_id);
1095
1096 if !is_ephemeral {
1098 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1099 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1100 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1101 return;
1102 }
1103 }
1104
1105 let _ = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1107 new_head: if is_ephemeral { config.head } else { new_head },
1108 claude_session_id: claude_session_id.unwrap_or_default(),
1109 usage: Some(ChatUsage {
1110 input_tokens: None,
1111 output_tokens: None,
1112 cost_usd,
1113 num_turns,
1114 }),
1115 }).await;
1116
1117 let _ = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await;
1118 }
1119}