1use super::{
2 executor::{ClaudeCodeExecutor, LaunchConfig},
3 sessions,
4 storage::ClaudeCodeStorage,
5 types::*,
6};
7use crate::activations::arbor::{NodeId, TreeId};
8use crate::plexus::{HubContext, NoParent};
9use async_stream::stream;
10use futures::{Stream, StreamExt};
11use plexus_macros::hub_methods;
12use serde_json::Value;
13use std::marker::PhantomData;
14use std::sync::{Arc, OnceLock};
15use tracing::Instrument;
16
17#[derive(Clone)]
24pub struct ClaudeCode<P: HubContext = NoParent> {
25 pub storage: Arc<ClaudeCodeStorage>,
26 executor: ClaudeCodeExecutor,
27 hub: Arc<OnceLock<P>>,
29 _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> ClaudeCode<P> {
33 pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
35 Self {
36 storage,
37 executor: ClaudeCodeExecutor::new(),
38 hub: Arc::new(OnceLock::new()),
39 _phantom: PhantomData,
40 }
41 }
42
43 pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
45 Self {
46 storage,
47 executor,
48 hub: Arc::new(OnceLock::new()),
49 _phantom: PhantomData,
50 }
51 }
52
53 pub fn inject_parent(&self, parent: P) {
58 let _ = self.hub.set(parent);
59 }
60
61 pub fn has_parent(&self) -> bool {
63 self.hub.get().is_some()
64 }
65
66 pub fn parent(&self) -> Option<&P> {
70 self.hub.get()
71 }
72
73 pub async fn resolve_handle_impl(
78 &self,
79 handle: &crate::types::Handle,
80 ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
81 use crate::plexus::{PlexusError, wrap_stream};
82 use async_stream::stream;
83
84 let storage = self.storage.clone();
85
86 if handle.meta.is_empty() {
89 return Err(PlexusError::ExecutionError(
90 "ClaudeCode handle missing message ID in meta".to_string()
91 ));
92 }
93 let identifier = handle.meta.join(":");
94
95 let name = handle.meta.get(2).cloned();
97
98 let result_stream = stream! {
99 match storage.resolve_message_handle(&identifier).await {
100 Ok(message) => {
101 yield ResolveResult::Message {
102 id: message.id.to_string(),
103 role: message.role.as_str().to_string(),
104 content: message.content,
105 model: message.model_id,
106 name: name.unwrap_or_else(|| message.role.as_str().to_string()),
107 };
108 }
109 Err(e) => {
110 yield ResolveResult::Error {
111 message: format!("Failed to resolve handle: {}", e.message),
112 };
113 }
114 }
115 };
116
117 Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
118 }
119}
120
121impl ClaudeCode<NoParent> {
123 pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
124 Self::with_context_type(storage)
125 }
126
127 pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
128 Self::with_executor_and_context(storage, executor)
129 }
130}
131
132async fn create_event_node(
138 arbor: &crate::activations::arbor::ArborStorage,
139 tree_id: &crate::activations::arbor::TreeId,
140 parent_id: &crate::activations::arbor::NodeId,
141 event: &NodeEvent,
142) -> Result<crate::activations::arbor::NodeId, String> {
143 let json = serde_json::to_string(event)
144 .map_err(|e| format!("Failed to serialize event: {}", e))?;
145
146 arbor.node_create_text(tree_id, Some(*parent_id), json, None)
147 .await
148 .map_err(|e| e.to_string())
149}
150
151#[hub_methods(
152 namespace = "claudecode",
153 version = "1.0.0",
154 description = "Manage Claude Code sessions with Arbor-backed conversation history",
155 resolve_handle
156)]
157impl<P: HubContext> ClaudeCode<P> {
158 #[plexus_macros::hub_method(params(
160 name = "Human-readable name for the session",
161 working_dir = "Working directory for Claude Code",
162 model = "Model to use (opus, sonnet, haiku)",
163 system_prompt = "Optional system prompt / instructions",
164 loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval"
165 ))]
166 pub async fn create(
167 &self,
168 name: String,
169 working_dir: String,
170 model: Model,
171 system_prompt: Option<String>,
172 loopback_enabled: Option<bool>,
173 ) -> impl Stream<Item = CreateResult> + Send + 'static {
174 let storage = self.storage.clone();
175 let loopback = loopback_enabled.unwrap_or(false);
176
177 stream! {
178 match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None).await {
179 Ok(config) => {
180 yield CreateResult::Ok {
181 id: config.id,
182 head: config.head,
183 };
184 }
185 Err(e) => {
186 yield CreateResult::Err { message: e.to_string() };
187 }
188 }
189 }
190 }
191
192 #[plexus_macros::hub_method(
194 streaming,
195 params(
196 name = "Session name to chat with",
197 prompt = "User message / prompt to send",
198 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
199 )
200 )]
201 pub async fn chat(
202 &self,
203 name: String,
204 prompt: String,
205 ephemeral: Option<bool>,
206 ) -> impl Stream<Item = ChatEvent> + Send + 'static {
207 let storage = self.storage.clone();
208 let executor = self.executor.clone();
209
210 let resolve_result = storage.session_get_by_name(&name).await;
212
213 stream! {
214 let is_ephemeral = ephemeral.unwrap_or(false);
215
216 let config = match resolve_result {
218 Ok(c) => c,
219 Err(e) => {
220 yield ChatEvent::Err { message: e.to_string() };
221 return;
222 }
223 };
224
225 let session_id = config.id;
226
227 let user_msg = if is_ephemeral {
229 match storage.message_create_ephemeral(
230 &session_id,
231 MessageRole::User,
232 prompt.clone(),
233 None, None, None, None,
234 ).await {
235 Ok(m) => m,
236 Err(e) => {
237 yield ChatEvent::Err { message: e.to_string() };
238 return;
239 }
240 }
241 } else {
242 match storage.message_create(
243 &session_id,
244 MessageRole::User,
245 prompt.clone(),
246 None, None, None, None,
247 ).await {
248 Ok(m) => m,
249 Err(e) => {
250 yield ChatEvent::Err { message: e.to_string() };
251 return;
252 }
253 }
254 };
255
256 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
258 let user_node_id = if is_ephemeral {
259 match storage.arbor().node_create_external_ephemeral(
260 &config.head.tree_id,
261 Some(config.head.node_id),
262 user_handle,
263 None,
264 ).await {
265 Ok(id) => id,
266 Err(e) => {
267 yield ChatEvent::Err { message: e.to_string() };
268 return;
269 }
270 }
271 } else {
272 match storage.arbor().node_create_external(
273 &config.head.tree_id,
274 Some(config.head.node_id),
275 user_handle,
276 None,
277 ).await {
278 Ok(id) => id,
279 Err(e) => {
280 yield ChatEvent::Err { message: e.to_string() };
281 return;
282 }
283 }
284 };
285
286 let user_position = Position::new(config.head.tree_id, user_node_id);
287
288 let mut current_parent = user_node_id;
290
291 let user_event = NodeEvent::UserMessage { content: prompt.clone() };
293 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &user_event).await {
294 current_parent = node_id;
295 }
296
297 let start_event = NodeEvent::AssistantStart;
299 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &start_event).await {
300 current_parent = node_id;
301 }
302
303 yield ChatEvent::Start {
305 id: session_id,
306 user_position,
307 };
308
309 let launch_config = LaunchConfig {
311 query: prompt,
312 session_id: config.claude_session_id.clone(),
313 fork_session: false,
314 model: config.model,
315 working_dir: config.working_dir.clone(),
316 system_prompt: config.system_prompt.clone(),
317 mcp_config: config.mcp_config.clone(),
318 loopback_enabled: config.loopback_enabled,
319 loopback_session_id: if config.loopback_enabled {
320 Some(session_id.to_string())
321 } else {
322 None
323 },
324 ..Default::default()
325 };
326
327 let mut response_content = String::new();
329 let mut claude_session_id = config.claude_session_id.clone();
330 let mut cost_usd = None;
331 let mut num_turns = None;
332
333 let mut raw_stream = executor.launch(launch_config).await;
334
335 let mut current_tool_id: Option<String> = None;
337 let mut current_tool_name: Option<String> = None;
338 let mut current_tool_input = String::new();
339
340 while let Some(event) = raw_stream.next().await {
341 match event {
342 RawClaudeEvent::System { session_id: sid, .. } => {
343 if let Some(id) = sid {
344 claude_session_id = Some(id);
345 }
346 }
347 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
348 if let Some(id) = sid {
349 claude_session_id = Some(id);
350 }
351 match inner {
352 StreamEventInner::ContentBlockDelta { delta, .. } => {
353 match delta {
354 StreamDelta::TextDelta { text } => {
355 response_content.push_str(&text);
356
357 let event = NodeEvent::ContentText { text: text.clone() };
359 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
360 current_parent = node_id;
361 }
362
363 yield ChatEvent::Content { text };
364 }
365 StreamDelta::InputJsonDelta { partial_json } => {
366 current_tool_input.push_str(&partial_json);
367 }
368 }
369 }
370 StreamEventInner::ContentBlockStart { content_block, .. } => {
371 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
372 current_tool_id = Some(id);
373 current_tool_name = Some(name);
374 current_tool_input.clear();
375 }
376 }
377 StreamEventInner::ContentBlockStop { .. } => {
378 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
380 let input: Value = serde_json::from_str(¤t_tool_input)
381 .unwrap_or(Value::Object(serde_json::Map::new()));
382
383 let event = NodeEvent::ContentToolUse {
385 id: id.clone(),
386 name: name.clone(),
387 input: input.clone(),
388 };
389 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
390 current_parent = node_id;
391 }
392
393 yield ChatEvent::ToolUse {
394 tool_name: name,
395 tool_use_id: id,
396 input,
397 };
398 current_tool_input.clear();
399 }
400 }
401 _ => {}
402 }
403 }
404 RawClaudeEvent::Assistant { message } => {
405 if let Some(msg) = message {
407 if let Some(content) = msg.content {
408 for block in content {
409 match block {
410 RawContentBlock::Text { text } => {
411 if response_content.is_empty() {
413 response_content.push_str(&text);
414
415 let event = NodeEvent::ContentText { text: text.clone() };
417 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
418 current_parent = node_id;
419 }
420
421 yield ChatEvent::Content { text };
422 }
423 }
424 RawContentBlock::ToolUse { id, name, input } => {
425 let event = NodeEvent::ContentToolUse {
427 id: id.clone(),
428 name: name.clone(),
429 input: input.clone(),
430 };
431 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
432 current_parent = node_id;
433 }
434
435 yield ChatEvent::ToolUse {
436 tool_name: name,
437 tool_use_id: id,
438 input,
439 };
440 }
441 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
442 let event = NodeEvent::UserToolResult {
444 tool_use_id: tool_use_id.clone(),
445 content: content.clone().unwrap_or_default(),
446 is_error: is_error.unwrap_or(false),
447 };
448 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
449 current_parent = node_id;
450 }
451
452 yield ChatEvent::ToolResult {
453 tool_use_id,
454 output: content.unwrap_or_default(),
455 is_error: is_error.unwrap_or(false),
456 };
457 }
458 RawContentBlock::Thinking { thinking, .. } => {
459 let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
461 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
462 current_parent = node_id;
463 }
464
465 yield ChatEvent::Thinking { thinking };
466 }
467 }
468 }
469 }
470 }
471 }
472 RawClaudeEvent::Result {
473 session_id: sid,
474 cost_usd: cost,
475 num_turns: turns,
476 is_error,
477 error,
478 ..
479 } => {
480 if let Some(id) = sid {
481 claude_session_id = Some(id);
482 }
483 cost_usd = cost;
484 num_turns = turns;
485
486 if is_error == Some(true) {
488 if let Some(err_msg) = error {
489 yield ChatEvent::Err { message: err_msg };
490 return;
491 }
492 }
493 }
494 RawClaudeEvent::Unknown { event_type, data } => {
495 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
497 Ok(handle) => {
498 tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
499 yield ChatEvent::Passthrough { event_type, handle, data };
500 }
501 Err(e) => {
502 tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
503 yield ChatEvent::Passthrough {
505 event_type,
506 handle: "storage-failed".to_string(),
507 data,
508 };
509 }
510 }
511 }
512 RawClaudeEvent::User { .. } => {
513 }
515 }
516 }
517
518 let model_id = format!("claude-code-{}", config.model.as_str());
520 let assistant_msg = if is_ephemeral {
521 match storage.message_create_ephemeral(
522 &session_id,
523 MessageRole::Assistant,
524 response_content,
525 Some(model_id),
526 None,
527 None,
528 cost_usd,
529 ).await {
530 Ok(m) => m,
531 Err(e) => {
532 yield ChatEvent::Err { message: e.to_string() };
533 return;
534 }
535 }
536 } else {
537 match storage.message_create(
538 &session_id,
539 MessageRole::Assistant,
540 response_content,
541 Some(model_id),
542 None,
543 None,
544 cost_usd,
545 ).await {
546 Ok(m) => m,
547 Err(e) => {
548 yield ChatEvent::Err { message: e.to_string() };
549 return;
550 }
551 }
552 };
553
554 let complete_event = NodeEvent::AssistantComplete { usage: None };
556 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &complete_event).await {
557 current_parent = node_id;
558 }
559
560 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
562 let assistant_node_id = if is_ephemeral {
563 match storage.arbor().node_create_external_ephemeral(
564 &config.head.tree_id,
565 Some(current_parent),
566 assistant_handle,
567 None,
568 ).await {
569 Ok(id) => id,
570 Err(e) => {
571 yield ChatEvent::Err { message: e.to_string() };
572 return;
573 }
574 }
575 } else {
576 match storage.arbor().node_create_external(
577 &config.head.tree_id,
578 Some(current_parent),
579 assistant_handle,
580 None,
581 ).await {
582 Ok(id) => id,
583 Err(e) => {
584 yield ChatEvent::Err { message: e.to_string() };
585 return;
586 }
587 }
588 };
589
590 let new_head = Position::new(config.head.tree_id, assistant_node_id);
591
592 if !is_ephemeral {
594 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
595 yield ChatEvent::Err { message: e.to_string() };
596 return;
597 }
598 }
599
600 yield ChatEvent::Complete {
603 new_head: if is_ephemeral { config.head } else { new_head },
604 claude_session_id: claude_session_id.unwrap_or_default(),
605 usage: Some(ChatUsage {
606 input_tokens: None,
607 output_tokens: None,
608 cost_usd,
609 num_turns,
610 }),
611 };
612 }
613 }
614
615 #[plexus_macros::hub_method]
617 async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
618 let result = self.storage.session_get_by_name(&name).await;
619
620 stream! {
621 match result {
622 Ok(config) => {
623 yield GetResult::Ok { config };
624 }
625 Err(e) => {
626 yield GetResult::Err { message: e.to_string() };
627 }
628 }
629 }
630 }
631
632 #[plexus_macros::hub_method]
634 async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
635 let storage = self.storage.clone();
636
637 stream! {
638 match storage.session_list().await {
639 Ok(sessions) => {
640 yield ListResult::Ok { sessions };
641 }
642 Err(e) => {
643 yield ListResult::Err { message: e.to_string() };
644 }
645 }
646 }
647 }
648
649 #[plexus_macros::hub_method]
651 async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
652 let storage = self.storage.clone();
653 let resolve_result = storage.session_get_by_name(&name).await;
654
655 stream! {
656 let config = match resolve_result {
657 Ok(c) => c,
658 Err(e) => {
659 yield DeleteResult::Err { message: e.to_string() };
660 return;
661 }
662 };
663
664 match storage.session_delete(&config.id).await {
665 Ok(_) => {
666 yield DeleteResult::Ok { id: config.id };
667 }
668 Err(e) => {
669 yield DeleteResult::Err { message: e.to_string() };
670 }
671 }
672 }
673 }
674
675 #[plexus_macros::hub_method]
677 async fn fork(
678 &self,
679 name: String,
680 new_name: String,
681 ) -> impl Stream<Item = ForkResult> + Send + 'static {
682 let storage = self.storage.clone();
683 let resolve_result = storage.session_get_by_name(&name).await;
684
685 stream! {
686 let parent = match resolve_result {
688 Ok(c) => c,
689 Err(e) => {
690 yield ForkResult::Err { message: e.to_string() };
691 return;
692 }
693 };
694
695 let new_config = match storage.session_create(
698 new_name,
699 parent.working_dir.clone(),
700 parent.model,
701 parent.system_prompt.clone(),
702 parent.mcp_config.clone(),
703 parent.loopback_enabled,
704 None,
705 ).await {
706 Ok(mut c) => {
707 if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
710 yield ForkResult::Err { message: e.to_string() };
711 return;
712 }
713 c.head = parent.head;
714 c
715 }
716 Err(e) => {
717 yield ForkResult::Err { message: e.to_string() };
718 return;
719 }
720 };
721
722 yield ForkResult::Ok {
723 id: new_config.id,
724 head: new_config.head,
725 };
726 }
727 }
728
729 #[plexus_macros::hub_method(
734 params(
735 name = "Session name to chat with",
736 prompt = "User message / prompt to send",
737 ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
738 )
739 )]
740 async fn chat_async(
741 &self,
742 name: String,
743 prompt: String,
744 ephemeral: Option<bool>,
745 ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
746 let storage = self.storage.clone();
747 let executor = self.executor.clone();
748
749 let resolve_result = storage.session_get_by_name(&name).await;
751
752 stream! {
753 let is_ephemeral = ephemeral.unwrap_or(false);
754
755 let config = match resolve_result {
757 Ok(c) => c,
758 Err(e) => {
759 yield ChatStartResult::Err { message: e.to_string() };
760 return;
761 }
762 };
763
764 let session_id = config.id;
765
766 let stream_id = match storage.stream_create(session_id).await {
768 Ok(id) => id,
769 Err(e) => {
770 yield ChatStartResult::Err { message: e.to_string() };
771 return;
772 }
773 };
774
775 let storage_bg = storage.clone();
777 let executor_bg = executor.clone();
778 let prompt_bg = prompt.clone();
779 let config_bg = config.clone();
780 let stream_id_bg = stream_id;
781
782 tokio::spawn(async move {
783 Self::run_chat_background(
784 storage_bg,
785 executor_bg,
786 config_bg,
787 prompt_bg,
788 is_ephemeral,
789 stream_id_bg,
790 ).await;
791 }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
792
793 yield ChatStartResult::Ok {
795 stream_id,
796 session_id,
797 };
798 }
799 }
800
801 #[plexus_macros::hub_method(
806 params(
807 stream_id = "Stream ID returned from chat_async",
808 from_seq = "Optional: start reading from this sequence number",
809 limit = "Optional: max events to return (default 100)"
810 )
811 )]
812 async fn poll(
813 &self,
814 stream_id: StreamId,
815 from_seq: Option<u64>,
816 limit: Option<u64>,
817 ) -> impl Stream<Item = PollResult> + Send + 'static {
818 let storage = self.storage.clone();
819
820 stream! {
821 let limit_usize = limit.map(|l| l as usize);
822
823 match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
824 Ok((info, events)) => {
825 let has_more = info.read_position < info.event_count;
826 yield PollResult::Ok {
827 status: info.status,
828 events,
829 read_position: info.read_position,
830 total_events: info.event_count,
831 has_more,
832 };
833 }
834 Err(e) => {
835 yield PollResult::Err { message: e.to_string() };
836 }
837 }
838 }
839 }
840
841 #[plexus_macros::hub_method(
845 params(
846 session_id = "Optional: filter by session ID"
847 )
848 )]
849 async fn streams(
850 &self,
851 session_id: Option<ClaudeCodeId>,
852 ) -> impl Stream<Item = StreamListResult> + Send + 'static {
853 let storage = self.storage.clone();
854
855 stream! {
856 let streams = if let Some(sid) = session_id {
857 storage.stream_list_for_session(&sid).await
858 } else {
859 storage.stream_list().await
860 };
861
862 yield StreamListResult::Ok { streams };
863 }
864 }
865
866 #[plexus_macros::hub_method(params(
868 name = "Session name"
869 ))]
870 async fn get_tree(
871 &self,
872 name: String,
873 ) -> impl Stream<Item = GetTreeResult> + Send + 'static {
874 let storage = self.storage.clone();
875
876 stream! {
877 let config = match storage.session_get_by_name(&name).await {
878 Ok(c) => c,
879 Err(e) => {
880 yield GetTreeResult::Err { message: e.to_string() };
881 return;
882 }
883 };
884
885 yield GetTreeResult::Ok {
886 tree_id: config.head.tree_id,
887 head: config.head.node_id,
888 };
889 }
890 }
891
892 #[plexus_macros::hub_method(params(
894 name = "Session name",
895 start = "Optional start node (default: root)",
896 end = "Optional end node (default: head)"
897 ))]
898 async fn render_context(
899 &self,
900 name: String,
901 start: Option<NodeId>,
902 end: Option<NodeId>,
903 ) -> impl Stream<Item = RenderResult> + Send + 'static {
904 let storage = self.storage.clone();
905
906 stream! {
907 let config = match storage.session_get_by_name(&name).await {
909 Ok(c) => c,
910 Err(e) => {
911 yield RenderResult::Err { message: e.to_string() };
912 return;
913 }
914 };
915
916 let tree_id = config.head.tree_id;
917 let end_node = end.unwrap_or(config.head.node_id);
918
919 let start_node = if let Some(s) = start {
921 s
922 } else {
923 match storage.arbor().tree_get(&tree_id).await {
924 Ok(tree) => tree.root,
925 Err(e) => {
926 yield RenderResult::Err { message: e.to_string() };
927 return;
928 }
929 }
930 };
931
932 let messages = match storage.render_messages(&tree_id, &start_node, &end_node).await {
934 Ok(m) => m,
935 Err(e) => {
936 yield RenderResult::Err { message: e.to_string() };
937 return;
938 }
939 };
940
941 yield RenderResult::Ok { messages };
942 }
943 }
944
945 #[plexus_macros::hub_method(params(
947 project_path = "Project path (e.g., '-workspace-hypermemetic-hub-codegen')"
948 ))]
949 async fn sessions_list(
950 &self,
951 project_path: String,
952 ) -> impl Stream<Item = SessionsListResult> + Send + 'static {
953 stream! {
954 match sessions::list_sessions(&project_path).await {
955 Ok(sessions) => {
956 yield SessionsListResult::Ok { sessions };
957 }
958 Err(e) => {
959 yield SessionsListResult::Err { message: e };
960 }
961 }
962 }
963 }
964
965 #[plexus_macros::hub_method(params(
967 project_path = "Project path",
968 session_id = "Session ID (UUID)"
969 ))]
970 async fn sessions_get(
971 &self,
972 project_path: String,
973 session_id: String,
974 ) -> impl Stream<Item = SessionsGetResult> + Send + 'static {
975 stream! {
976 match sessions::read_session(&project_path, &session_id).await {
977 Ok(events) => {
978 let event_count = events.len();
979 let events_json: Vec<serde_json::Value> = events.into_iter()
981 .filter_map(|e| serde_json::to_value(e).ok())
982 .collect();
983
984 yield SessionsGetResult::Ok {
985 session_id: session_id.clone(),
986 event_count,
987 events: events_json,
988 };
989 }
990 Err(e) => {
991 yield SessionsGetResult::Err { message: e };
992 }
993 }
994 }
995 }
996
997 #[plexus_macros::hub_method(params(
999 project_path = "Project path",
1000 session_id = "Session ID to import",
1001 owner_id = "Owner ID for the new tree (default: 'claudecode')"
1002 ))]
1003 async fn sessions_import(
1004 &self,
1005 project_path: String,
1006 session_id: String,
1007 owner_id: Option<String>,
1008 ) -> impl Stream<Item = SessionsImportResult> + Send + 'static {
1009 let storage = self.storage.clone();
1010
1011 stream! {
1012 let owner = owner_id.unwrap_or_else(|| "claudecode".to_string());
1013
1014 match sessions::import_to_arbor(storage.arbor(), &project_path, &session_id, &owner).await {
1015 Ok(tree_id) => {
1016 yield SessionsImportResult::Ok {
1017 tree_id,
1018 session_id,
1019 };
1020 }
1021 Err(e) => {
1022 yield SessionsImportResult::Err { message: e };
1023 }
1024 }
1025 }
1026 }
1027
1028 #[plexus_macros::hub_method(params(
1030 tree_id = "Arbor tree ID to export",
1031 project_path = "Project path",
1032 session_id = "Session ID for the exported file"
1033 ))]
1034 async fn sessions_export(
1035 &self,
1036 tree_id: TreeId,
1037 project_path: String,
1038 session_id: String,
1039 ) -> impl Stream<Item = SessionsExportResult> + Send + 'static {
1040 let storage = self.storage.clone();
1041
1042 stream! {
1043 match sessions::export_from_arbor(storage.arbor(), &tree_id, &project_path, &session_id).await {
1044 Ok(()) => {
1045 yield SessionsExportResult::Ok {
1046 tree_id,
1047 session_id,
1048 };
1049 }
1050 Err(e) => {
1051 yield SessionsExportResult::Err { message: e };
1052 }
1053 }
1054 }
1055 }
1056
1057 #[plexus_macros::hub_method(params(
1059 project_path = "Project path",
1060 session_id = "Session ID to delete"
1061 ))]
1062 async fn sessions_delete(
1063 &self,
1064 project_path: String,
1065 session_id: String,
1066 ) -> impl Stream<Item = SessionsDeleteResult> + Send + 'static {
1067 stream! {
1068 match sessions::delete_session(&project_path, &session_id).await {
1069 Ok(()) => {
1070 yield SessionsDeleteResult::Ok {
1071 session_id,
1072 deleted: true,
1073 };
1074 }
1075 Err(e) => {
1076 yield SessionsDeleteResult::Err { message: e };
1077 }
1078 }
1079 }
1080 }
1081}
1082
1083impl<P: HubContext> ClaudeCode<P> {
1085 async fn run_chat_background(
1087 storage: Arc<ClaudeCodeStorage>,
1088 executor: ClaudeCodeExecutor,
1089 config: ClaudeCodeConfig,
1090 prompt: String,
1091 is_ephemeral: bool,
1092 stream_id: StreamId,
1093 ) {
1094 let session_id = config.id;
1095
1096 let user_msg = if is_ephemeral {
1098 match storage.message_create_ephemeral(
1099 &session_id,
1100 MessageRole::User,
1101 prompt.clone(),
1102 None, None, None, None,
1103 ).await {
1104 Ok(m) => m,
1105 Err(e) => {
1106 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1107 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1108 return;
1109 }
1110 }
1111 } else {
1112 match storage.message_create(
1113 &session_id,
1114 MessageRole::User,
1115 prompt.clone(),
1116 None, None, None, None,
1117 ).await {
1118 Ok(m) => m,
1119 Err(e) => {
1120 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1121 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1122 return;
1123 }
1124 }
1125 };
1126
1127 let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
1129 let user_node_id = if is_ephemeral {
1130 match storage.arbor().node_create_external_ephemeral(
1131 &config.head.tree_id,
1132 Some(config.head.node_id),
1133 user_handle,
1134 None,
1135 ).await {
1136 Ok(id) => id,
1137 Err(e) => {
1138 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1139 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1140 return;
1141 }
1142 }
1143 } else {
1144 match storage.arbor().node_create_external(
1145 &config.head.tree_id,
1146 Some(config.head.node_id),
1147 user_handle,
1148 None,
1149 ).await {
1150 Ok(id) => id,
1151 Err(e) => {
1152 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1153 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1154 return;
1155 }
1156 }
1157 };
1158
1159 let user_position = Position::new(config.head.tree_id, user_node_id);
1160
1161 let mut current_parent = user_node_id;
1163
1164 let user_event = NodeEvent::UserMessage { content: prompt.clone() };
1166 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &user_event).await {
1167 current_parent = node_id;
1168 }
1169
1170 let start_event = NodeEvent::AssistantStart;
1172 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &start_event).await {
1173 current_parent = node_id;
1174 }
1175
1176 let _ = storage.stream_set_user_position(&stream_id, user_position).await;
1178
1179 let _ = storage.stream_push_event(&stream_id, ChatEvent::Start {
1181 id: session_id,
1182 user_position,
1183 }).await;
1184
1185 let launch_config = LaunchConfig {
1187 query: prompt,
1188 session_id: config.claude_session_id.clone(),
1189 fork_session: false,
1190 model: config.model,
1191 working_dir: config.working_dir.clone(),
1192 system_prompt: config.system_prompt.clone(),
1193 mcp_config: config.mcp_config.clone(),
1194 loopback_enabled: config.loopback_enabled,
1195 loopback_session_id: if config.loopback_enabled {
1196 Some(session_id.to_string())
1197 } else {
1198 None
1199 },
1200 ..Default::default()
1201 };
1202
1203 let mut response_content = String::new();
1205 let mut claude_session_id = config.claude_session_id.clone();
1206 let mut cost_usd = None;
1207 let mut num_turns = None;
1208
1209 let mut raw_stream = executor.launch(launch_config).await;
1210
1211 let mut current_tool_id: Option<String> = None;
1213 let mut current_tool_name: Option<String> = None;
1214 let mut current_tool_input = String::new();
1215
1216 while let Some(event) = raw_stream.next().await {
1217 match event {
1218 RawClaudeEvent::System { session_id: sid, .. } => {
1219 if let Some(id) = sid {
1220 claude_session_id = Some(id);
1221 }
1222 }
1223 RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
1224 if let Some(id) = sid {
1225 claude_session_id = Some(id);
1226 }
1227 match inner {
1228 StreamEventInner::ContentBlockDelta { delta, .. } => {
1229 match delta {
1230 StreamDelta::TextDelta { text } => {
1231 response_content.push_str(&text);
1232
1233 let event = NodeEvent::ContentText { text: text.clone() };
1235 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1236 current_parent = node_id;
1237 }
1238
1239 let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
1240 }
1241 StreamDelta::InputJsonDelta { partial_json } => {
1242 current_tool_input.push_str(&partial_json);
1243 }
1244 }
1245 }
1246 StreamEventInner::ContentBlockStart { content_block, .. } => {
1247 if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
1248 current_tool_id = Some(id);
1249 current_tool_name = Some(name);
1250 current_tool_input.clear();
1251 }
1252 }
1253 StreamEventInner::ContentBlockStop { .. } => {
1254 if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
1255 let input: Value = serde_json::from_str(¤t_tool_input)
1256 .unwrap_or(Value::Object(serde_json::Map::new()));
1257
1258 if name == "mcp__plexus__loopback_permit" {
1260 let _ = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await;
1261 }
1262
1263 let event = NodeEvent::ContentToolUse {
1265 id: id.clone(),
1266 name: name.clone(),
1267 input: input.clone(),
1268 };
1269 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1270 current_parent = node_id;
1271 }
1272
1273 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1274 tool_name: name,
1275 tool_use_id: id,
1276 input,
1277 }).await;
1278 current_tool_input.clear();
1279 }
1280 }
1281 StreamEventInner::MessageDelta { delta } => {
1282 if delta.stop_reason == Some("tool_use".to_string()) {
1284 }
1286 }
1287 _ => {}
1288 }
1289 }
1290 RawClaudeEvent::Assistant { message } => {
1291 if let Some(msg) = message {
1292 if let Some(content) = msg.content {
1293 for block in content {
1294 match block {
1295 RawContentBlock::Text { text } => {
1296 if response_content.is_empty() {
1297 response_content.push_str(&text);
1298
1299 let event = NodeEvent::ContentText { text: text.clone() };
1301 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1302 current_parent = node_id;
1303 }
1304
1305 let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
1306 }
1307 }
1308 RawContentBlock::ToolUse { id, name, input } => {
1309 let event = NodeEvent::ContentToolUse {
1311 id: id.clone(),
1312 name: name.clone(),
1313 input: input.clone(),
1314 };
1315 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1316 current_parent = node_id;
1317 }
1318
1319 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1320 tool_name: name,
1321 tool_use_id: id,
1322 input,
1323 }).await;
1324 }
1325 RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
1326 let _ = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await;
1328
1329 let event = NodeEvent::UserToolResult {
1331 tool_use_id: tool_use_id.clone(),
1332 content: content.clone().unwrap_or_default(),
1333 is_error: is_error.unwrap_or(false),
1334 };
1335 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1336 current_parent = node_id;
1337 }
1338
1339 let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
1340 tool_use_id,
1341 output: content.unwrap_or_default(),
1342 is_error: is_error.unwrap_or(false),
1343 }).await;
1344 }
1345 RawContentBlock::Thinking { thinking, .. } => {
1346 let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
1348 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &event).await {
1349 current_parent = node_id;
1350 }
1351
1352 let _ = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await;
1353 }
1354 }
1355 }
1356 }
1357 }
1358 }
1359 RawClaudeEvent::Result {
1360 session_id: sid,
1361 cost_usd: cost,
1362 num_turns: turns,
1363 is_error,
1364 error,
1365 ..
1366 } => {
1367 if let Some(id) = sid {
1368 claude_session_id = Some(id);
1369 }
1370 cost_usd = cost;
1371 num_turns = turns;
1372
1373 if is_error == Some(true) {
1374 if let Some(err_msg) = error {
1375 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await;
1376 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await;
1377 return;
1378 }
1379 }
1380 }
1381 RawClaudeEvent::Unknown { event_type, data } => {
1382 match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1383 Ok(handle) => {
1384 let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await;
1385 }
1386 Err(_) => {
1387 let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1388 event_type,
1389 handle: "storage-failed".to_string(),
1390 data,
1391 }).await;
1392 }
1393 }
1394 }
1395 RawClaudeEvent::User { .. } => {}
1396 }
1397 }
1398
1399 let model_id = format!("claude-code-{}", config.model.as_str());
1401 let assistant_msg = if is_ephemeral {
1402 match storage.message_create_ephemeral(
1403 &session_id,
1404 MessageRole::Assistant,
1405 response_content,
1406 Some(model_id),
1407 None,
1408 None,
1409 cost_usd,
1410 ).await {
1411 Ok(m) => m,
1412 Err(e) => {
1413 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1414 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1415 return;
1416 }
1417 }
1418 } else {
1419 match storage.message_create(
1420 &session_id,
1421 MessageRole::Assistant,
1422 response_content,
1423 Some(model_id),
1424 None,
1425 None,
1426 cost_usd,
1427 ).await {
1428 Ok(m) => m,
1429 Err(e) => {
1430 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1431 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1432 return;
1433 }
1434 }
1435 };
1436
1437 let complete_event = NodeEvent::AssistantComplete { usage: None };
1439 if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, ¤t_parent, &complete_event).await {
1440 current_parent = node_id;
1441 }
1442
1443 let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1445 let assistant_node_id = if is_ephemeral {
1446 match storage.arbor().node_create_external_ephemeral(
1447 &config.head.tree_id,
1448 Some(current_parent),
1449 assistant_handle,
1450 None,
1451 ).await {
1452 Ok(id) => id,
1453 Err(e) => {
1454 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1455 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1456 return;
1457 }
1458 }
1459 } else {
1460 match storage.arbor().node_create_external(
1461 &config.head.tree_id,
1462 Some(current_parent),
1463 assistant_handle,
1464 None,
1465 ).await {
1466 Ok(id) => id,
1467 Err(e) => {
1468 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1469 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1470 return;
1471 }
1472 }
1473 };
1474
1475 let new_head = Position::new(config.head.tree_id, assistant_node_id);
1476
1477 if !is_ephemeral {
1479 if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1480 let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1481 let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1482 return;
1483 }
1484 }
1485
1486 let _ = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1488 new_head: if is_ephemeral { config.head } else { new_head },
1489 claude_session_id: claude_session_id.unwrap_or_default(),
1490 usage: Some(ChatUsage {
1491 input_tokens: None,
1492 output_tokens: None,
1493 cost_usd,
1494 num_turns,
1495 }),
1496 }).await;
1497
1498 let _ = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await;
1499 }
1500}