Skip to main content

plexus_substrate/activations/claudecode/
activation.rs

1use super::{
2    executor::{ClaudeCodeExecutor, LaunchConfig},
3    storage::ClaudeCodeStorage,
4    types::*,
5};
6use crate::plexus::{HubContext, NoParent};
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use plexus_macros::hub_methods;
10use serde_json::Value;
11use std::marker::PhantomData;
12use std::sync::{Arc, OnceLock};
13use tracing::Instrument;
14
15/// ClaudeCode activation - manages Claude Code sessions with Arbor-backed history
16///
17/// Generic over `P: HubContext` to allow different parent contexts:
18/// - `Weak<DynamicHub>` when registered with a DynamicHub
19/// - Custom context types for sub-hubs
20/// - `NoParent` for standalone testing
21#[derive(Clone)]
22pub struct ClaudeCode<P: HubContext = NoParent> {
23    storage: Arc<ClaudeCodeStorage>,
24    executor: ClaudeCodeExecutor,
25    /// Hub reference for resolving foreign handles when walking arbor trees
26    hub: Arc<OnceLock<P>>,
27    _phantom: PhantomData<P>,
28}
29
30impl<P: HubContext> ClaudeCode<P> {
31    /// Create a new ClaudeCode with a specific parent context type
32    pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
33        Self {
34            storage,
35            executor: ClaudeCodeExecutor::new(),
36            hub: Arc::new(OnceLock::new()),
37            _phantom: PhantomData,
38        }
39    }
40
41    /// Create with custom executor and parent context type
42    pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
43        Self {
44            storage,
45            executor,
46            hub: Arc::new(OnceLock::new()),
47            _phantom: PhantomData,
48        }
49    }
50
51    /// Inject parent context for resolving foreign handles
52    ///
53    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
54    /// This allows ClaudeCode to resolve handles from other activations when walking arbor trees.
55    pub fn inject_parent(&self, parent: P) {
56        let _ = self.hub.set(parent);
57    }
58
59    /// Check if parent context has been injected
60    pub fn has_parent(&self) -> bool {
61        self.hub.get().is_some()
62    }
63
64    /// Get a reference to the parent context
65    ///
66    /// Returns None if inject_parent hasn't been called yet.
67    pub fn parent(&self) -> Option<&P> {
68        self.hub.get()
69    }
70
71    /// Resolve a claudecode handle to its message content
72    ///
73    /// Called by the macro-generated resolve_handle method.
74    /// Handle format: {plugin_id}@1.0.0::chat:msg-{uuid}:{role}:{name}
75    pub async fn resolve_handle_impl(
76        &self,
77        handle: &crate::types::Handle,
78    ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
79        use crate::plexus::{PlexusError, wrap_stream};
80        use async_stream::stream;
81
82        let storage = self.storage.clone();
83
84        // Join meta parts into colon-separated identifier
85        // Format: "msg-{uuid}:{role}:{name}"
86        if handle.meta.is_empty() {
87            return Err(PlexusError::ExecutionError(
88                "ClaudeCode handle missing message ID in meta".to_string()
89            ));
90        }
91        let identifier = handle.meta.join(":");
92
93        // Extract name from meta if present (for response)
94        let name = handle.meta.get(2).cloned();
95
96        let result_stream = stream! {
97            match storage.resolve_message_handle(&identifier).await {
98                Ok(message) => {
99                    yield ResolveResult::Message {
100                        id: message.id.to_string(),
101                        role: message.role.as_str().to_string(),
102                        content: message.content,
103                        model: message.model_id,
104                        name: name.unwrap_or_else(|| message.role.as_str().to_string()),
105                    };
106                }
107                Err(e) => {
108                    yield ResolveResult::Error {
109                        message: format!("Failed to resolve handle: {}", e.message),
110                    };
111                }
112            }
113        };
114
115        Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
116    }
117}
118
119/// Convenience constructors for ClaudeCode with NoParent (standalone/testing)
120impl ClaudeCode<NoParent> {
121    pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
122        Self::with_context_type(storage)
123    }
124
125    pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
126        Self::with_executor_and_context(storage, executor)
127    }
128}
129
130#[hub_methods(
131    namespace = "claudecode",
132    version = "1.0.0",
133    description = "Manage Claude Code sessions with Arbor-backed conversation history",
134    resolve_handle
135)]
136impl<P: HubContext> ClaudeCode<P> {
137    /// Create a new Claude Code session
138    #[plexus_macros::hub_method(params(
139        name = "Human-readable name for the session",
140        working_dir = "Working directory for Claude Code",
141        model = "Model to use (opus, sonnet, haiku)",
142        system_prompt = "Optional system prompt / instructions",
143        loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval"
144    ))]
145    async fn create(
146        &self,
147        name: String,
148        working_dir: String,
149        model: Model,
150        system_prompt: Option<String>,
151        loopback_enabled: Option<bool>,
152    ) -> impl Stream<Item = CreateResult> + Send + 'static {
153        let storage = self.storage.clone();
154        let loopback = loopback_enabled.unwrap_or(false);
155
156        stream! {
157            match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None).await {
158                Ok(config) => {
159                    yield CreateResult::Ok {
160                        id: config.id,
161                        head: config.head,
162                    };
163                }
164                Err(e) => {
165                    yield CreateResult::Err { message: e.to_string() };
166                }
167            }
168        }
169    }
170
171    /// Chat with a session, streaming tokens like Cone
172    #[plexus_macros::hub_method(
173        streaming,
174        params(
175            name = "Session name to chat with",
176            prompt = "User message / prompt to send",
177            ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
178        )
179    )]
180    async fn chat(
181        &self,
182        name: String,
183        prompt: String,
184        ephemeral: Option<bool>,
185    ) -> impl Stream<Item = ChatEvent> + Send + 'static {
186        let storage = self.storage.clone();
187        let executor = self.executor.clone();
188
189        // Resolve before entering stream to avoid lifetime issues
190        let resolve_result = storage.session_get_by_name(&name).await;
191
192        stream! {
193            let is_ephemeral = ephemeral.unwrap_or(false);
194
195            // 1. Resolve and load session
196            let config = match resolve_result {
197                Ok(c) => c,
198                Err(e) => {
199                    yield ChatEvent::Err { message: e.to_string() };
200                    return;
201                }
202            };
203
204            let session_id = config.id;
205
206            // 2. Store user message in our database (ephemeral if requested)
207            let user_msg = if is_ephemeral {
208                match storage.message_create_ephemeral(
209                    &session_id,
210                    MessageRole::User,
211                    prompt.clone(),
212                    None, None, None, None,
213                ).await {
214                    Ok(m) => m,
215                    Err(e) => {
216                        yield ChatEvent::Err { message: e.to_string() };
217                        return;
218                    }
219                }
220            } else {
221                match storage.message_create(
222                    &session_id,
223                    MessageRole::User,
224                    prompt.clone(),
225                    None, None, None, None,
226                ).await {
227                    Ok(m) => m,
228                    Err(e) => {
229                        yield ChatEvent::Err { message: e.to_string() };
230                        return;
231                    }
232                }
233            };
234
235            // 3. Create user message node in Arbor (ephemeral if requested)
236            let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
237            let user_node_id = if is_ephemeral {
238                match storage.arbor().node_create_external_ephemeral(
239                    &config.head.tree_id,
240                    Some(config.head.node_id),
241                    user_handle,
242                    None,
243                ).await {
244                    Ok(id) => id,
245                    Err(e) => {
246                        yield ChatEvent::Err { message: e.to_string() };
247                        return;
248                    }
249                }
250            } else {
251                match storage.arbor().node_create_external(
252                    &config.head.tree_id,
253                    Some(config.head.node_id),
254                    user_handle,
255                    None,
256                ).await {
257                    Ok(id) => id,
258                    Err(e) => {
259                        yield ChatEvent::Err { message: e.to_string() };
260                        return;
261                    }
262                }
263            };
264
265            let user_position = Position::new(config.head.tree_id, user_node_id);
266
267            // 4. Emit Start
268            yield ChatEvent::Start {
269                id: session_id,
270                user_position,
271            };
272
273            // 5. Build launch config
274            let launch_config = LaunchConfig {
275                query: prompt,
276                session_id: config.claude_session_id.clone(),
277                fork_session: false,
278                model: config.model,
279                working_dir: config.working_dir.clone(),
280                system_prompt: config.system_prompt.clone(),
281                mcp_config: config.mcp_config.clone(),
282                loopback_enabled: config.loopback_enabled,
283                loopback_session_id: if config.loopback_enabled {
284                    Some(session_id.to_string())
285                } else {
286                    None
287                },
288                ..Default::default()
289            };
290
291            // 6. Launch Claude and stream events
292            let mut response_content = String::new();
293            let mut claude_session_id = config.claude_session_id.clone();
294            let mut cost_usd = None;
295            let mut num_turns = None;
296
297            let mut raw_stream = executor.launch(launch_config).await;
298
299            // Track current tool use for streaming tool input
300            let mut current_tool_id: Option<String> = None;
301            let mut current_tool_name: Option<String> = None;
302            let mut current_tool_input = String::new();
303
304            while let Some(event) = raw_stream.next().await {
305                match event {
306                    RawClaudeEvent::System { session_id: sid, .. } => {
307                        if let Some(id) = sid {
308                            claude_session_id = Some(id);
309                        }
310                    }
311                    RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
312                        if let Some(id) = sid {
313                            claude_session_id = Some(id);
314                        }
315                        match inner {
316                            StreamEventInner::ContentBlockDelta { delta, .. } => {
317                                match delta {
318                                    StreamDelta::TextDelta { text } => {
319                                        response_content.push_str(&text);
320                                        yield ChatEvent::Content { text };
321                                    }
322                                    StreamDelta::InputJsonDelta { partial_json } => {
323                                        current_tool_input.push_str(&partial_json);
324                                    }
325                                }
326                            }
327                            StreamEventInner::ContentBlockStart { content_block, .. } => {
328                                if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
329                                    current_tool_id = Some(id);
330                                    current_tool_name = Some(name);
331                                    current_tool_input.clear();
332                                }
333                            }
334                            StreamEventInner::ContentBlockStop { .. } => {
335                                // Emit tool use if we were building one
336                                if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
337                                    let input: Value = serde_json::from_str(&current_tool_input)
338                                        .unwrap_or(Value::Object(serde_json::Map::new()));
339                                    yield ChatEvent::ToolUse {
340                                        tool_name: name,
341                                        tool_use_id: id,
342                                        input,
343                                    };
344                                    current_tool_input.clear();
345                                }
346                            }
347                            _ => {}
348                        }
349                    }
350                    RawClaudeEvent::Assistant { message } => {
351                        // Still handle non-streaming assistant messages (tool results, etc.)
352                        if let Some(msg) = message {
353                            if let Some(content) = msg.content {
354                                for block in content {
355                                    match block {
356                                        RawContentBlock::Text { text } => {
357                                            // Only emit if we haven't already streamed this
358                                            if response_content.is_empty() {
359                                                response_content.push_str(&text);
360                                                yield ChatEvent::Content { text };
361                                            }
362                                        }
363                                        RawContentBlock::ToolUse { id, name, input } => {
364                                            yield ChatEvent::ToolUse {
365                                                tool_name: name,
366                                                tool_use_id: id,
367                                                input,
368                                            };
369                                        }
370                                        RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
371                                            yield ChatEvent::ToolResult {
372                                                tool_use_id,
373                                                output: content.unwrap_or_default(),
374                                                is_error: is_error.unwrap_or(false),
375                                            };
376                                        }
377                                        RawContentBlock::Thinking { thinking, .. } => {
378                                            yield ChatEvent::Thinking { thinking };
379                                        }
380                                    }
381                                }
382                            }
383                        }
384                    }
385                    RawClaudeEvent::Result {
386                        session_id: sid,
387                        cost_usd: cost,
388                        num_turns: turns,
389                        is_error,
390                        error,
391                        ..
392                    } => {
393                        if let Some(id) = sid {
394                            claude_session_id = Some(id);
395                        }
396                        cost_usd = cost;
397                        num_turns = turns;
398
399                        // Check for error
400                        if is_error == Some(true) {
401                            if let Some(err_msg) = error {
402                                yield ChatEvent::Err { message: err_msg };
403                                return;
404                            }
405                        }
406                    }
407                    RawClaudeEvent::Unknown { event_type, data } => {
408                        // Store unknown event and get handle
409                        match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
410                            Ok(handle) => {
411                                tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
412                                yield ChatEvent::Passthrough { event_type, handle, data };
413                            }
414                            Err(e) => {
415                                tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
416                                // Still forward the event even if storage fails
417                                yield ChatEvent::Passthrough {
418                                    event_type,
419                                    handle: "storage-failed".to_string(),
420                                    data,
421                                };
422                            }
423                        }
424                    }
425                    RawClaudeEvent::User { .. } => {
426                        // User events are echoed back but we don't need to process them
427                    }
428                }
429            }
430
431            // 7. Store assistant response (ephemeral if requested)
432            let model_id = format!("claude-code-{}", config.model.as_str());
433            let assistant_msg = if is_ephemeral {
434                match storage.message_create_ephemeral(
435                    &session_id,
436                    MessageRole::Assistant,
437                    response_content,
438                    Some(model_id),
439                    None,
440                    None,
441                    cost_usd,
442                ).await {
443                    Ok(m) => m,
444                    Err(e) => {
445                        yield ChatEvent::Err { message: e.to_string() };
446                        return;
447                    }
448                }
449            } else {
450                match storage.message_create(
451                    &session_id,
452                    MessageRole::Assistant,
453                    response_content,
454                    Some(model_id),
455                    None,
456                    None,
457                    cost_usd,
458                ).await {
459                    Ok(m) => m,
460                    Err(e) => {
461                        yield ChatEvent::Err { message: e.to_string() };
462                        return;
463                    }
464                }
465            };
466
467            // 8. Create assistant node in Arbor (ephemeral if requested)
468            let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
469            let assistant_node_id = if is_ephemeral {
470                match storage.arbor().node_create_external_ephemeral(
471                    &config.head.tree_id,
472                    Some(user_node_id),
473                    assistant_handle,
474                    None,
475                ).await {
476                    Ok(id) => id,
477                    Err(e) => {
478                        yield ChatEvent::Err { message: e.to_string() };
479                        return;
480                    }
481                }
482            } else {
483                match storage.arbor().node_create_external(
484                    &config.head.tree_id,
485                    Some(user_node_id),
486                    assistant_handle,
487                    None,
488                ).await {
489                    Ok(id) => id,
490                    Err(e) => {
491                        yield ChatEvent::Err { message: e.to_string() };
492                        return;
493                    }
494                }
495            };
496
497            let new_head = Position::new(config.head.tree_id, assistant_node_id);
498
499            // 9. Update session head and Claude session ID (skip for ephemeral)
500            if !is_ephemeral {
501                if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
502                    yield ChatEvent::Err { message: e.to_string() };
503                    return;
504                }
505            }
506
507            // 10. Emit Complete
508            // For ephemeral, new_head points to the ephemeral node (not the session's actual head)
509            yield ChatEvent::Complete {
510                new_head: if is_ephemeral { config.head } else { new_head },
511                claude_session_id: claude_session_id.unwrap_or_default(),
512                usage: Some(ChatUsage {
513                    input_tokens: None,
514                    output_tokens: None,
515                    cost_usd,
516                    num_turns,
517                }),
518            };
519        }
520    }
521
522    /// Get session configuration details
523    #[plexus_macros::hub_method]
524    async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
525        let result = self.storage.session_get_by_name(&name).await;
526
527        stream! {
528            match result {
529                Ok(config) => {
530                    yield GetResult::Ok { config };
531                }
532                Err(e) => {
533                    yield GetResult::Err { message: e.to_string() };
534                }
535            }
536        }
537    }
538
539    /// List all Claude Code sessions
540    #[plexus_macros::hub_method]
541    async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
542        let storage = self.storage.clone();
543
544        stream! {
545            match storage.session_list().await {
546                Ok(sessions) => {
547                    yield ListResult::Ok { sessions };
548                }
549                Err(e) => {
550                    yield ListResult::Err { message: e.to_string() };
551                }
552            }
553        }
554    }
555
556    /// Delete a session
557    #[plexus_macros::hub_method]
558    async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
559        let storage = self.storage.clone();
560        let resolve_result = storage.session_get_by_name(&name).await;
561
562        stream! {
563            let config = match resolve_result {
564                Ok(c) => c,
565                Err(e) => {
566                    yield DeleteResult::Err { message: e.to_string() };
567                    return;
568                }
569            };
570
571            match storage.session_delete(&config.id).await {
572                Ok(_) => {
573                    yield DeleteResult::Ok { id: config.id };
574                }
575                Err(e) => {
576                    yield DeleteResult::Err { message: e.to_string() };
577                }
578            }
579        }
580    }
581
582    /// Fork a session to create a branch point
583    #[plexus_macros::hub_method]
584    async fn fork(
585        &self,
586        name: String,
587        new_name: String,
588    ) -> impl Stream<Item = ForkResult> + Send + 'static {
589        let storage = self.storage.clone();
590        let resolve_result = storage.session_get_by_name(&name).await;
591
592        stream! {
593            // Get parent session
594            let parent = match resolve_result {
595                Ok(c) => c,
596                Err(e) => {
597                    yield ForkResult::Err { message: e.to_string() };
598                    return;
599                }
600            };
601
602            // Create new session starting at parent's head position
603            // The new session will fork Claude's session on first chat
604            let new_config = match storage.session_create(
605                new_name,
606                parent.working_dir.clone(),
607                parent.model,
608                parent.system_prompt.clone(),
609                parent.mcp_config.clone(),
610                parent.loopback_enabled,
611                None,
612            ).await {
613                Ok(mut c) => {
614                    // Update head to parent's position (share the same tree point)
615                    // This creates a branch - the new session diverges from here
616                    if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
617                        yield ForkResult::Err { message: e.to_string() };
618                        return;
619                    }
620                    c.head = parent.head;
621                    c
622                }
623                Err(e) => {
624                    yield ForkResult::Err { message: e.to_string() };
625                    return;
626                }
627            };
628
629            yield ForkResult::Ok {
630                id: new_config.id,
631                head: new_config.head,
632            };
633        }
634    }
635
636    /// Start an async chat - returns immediately with stream_id for polling
637    ///
638    /// This is the non-blocking version of chat, designed for loopback scenarios
639    /// where the parent needs to poll for events and handle tool approvals.
640    #[plexus_macros::hub_method(
641        params(
642            name = "Session name to chat with",
643            prompt = "User message / prompt to send",
644            ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
645        )
646    )]
647    async fn chat_async(
648        &self,
649        name: String,
650        prompt: String,
651        ephemeral: Option<bool>,
652    ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
653        let storage = self.storage.clone();
654        let executor = self.executor.clone();
655
656        // Resolve session before entering stream
657        let resolve_result = storage.session_get_by_name(&name).await;
658
659        stream! {
660            let is_ephemeral = ephemeral.unwrap_or(false);
661
662            // 1. Resolve session
663            let config = match resolve_result {
664                Ok(c) => c,
665                Err(e) => {
666                    yield ChatStartResult::Err { message: e.to_string() };
667                    return;
668                }
669            };
670
671            let session_id = config.id;
672
673            // 2. Create stream buffer
674            let stream_id = match storage.stream_create(session_id).await {
675                Ok(id) => id,
676                Err(e) => {
677                    yield ChatStartResult::Err { message: e.to_string() };
678                    return;
679                }
680            };
681
682            // 3. Spawn background task to run the chat
683            let storage_bg = storage.clone();
684            let executor_bg = executor.clone();
685            let prompt_bg = prompt.clone();
686            let config_bg = config.clone();
687            let stream_id_bg = stream_id;
688
689            tokio::spawn(async move {
690                Self::run_chat_background(
691                    storage_bg,
692                    executor_bg,
693                    config_bg,
694                    prompt_bg,
695                    is_ephemeral,
696                    stream_id_bg,
697                ).await;
698            }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
699
700            // 4. Return immediately with stream_id
701            yield ChatStartResult::Ok {
702                stream_id,
703                session_id,
704            };
705        }
706    }
707
708    /// Poll a stream for new events
709    ///
710    /// Returns events since the last poll (or from the specified offset).
711    /// Use this to read events from an async chat started with chat_async.
712    #[plexus_macros::hub_method(
713        params(
714            stream_id = "Stream ID returned from chat_async",
715            from_seq = "Optional: start reading from this sequence number",
716            limit = "Optional: max events to return (default 100)"
717        )
718    )]
719    async fn poll(
720        &self,
721        stream_id: StreamId,
722        from_seq: Option<u64>,
723        limit: Option<u64>,
724    ) -> impl Stream<Item = PollResult> + Send + 'static {
725        let storage = self.storage.clone();
726
727        stream! {
728            let limit_usize = limit.map(|l| l as usize);
729
730            match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
731                Ok((info, events)) => {
732                    let has_more = info.read_position < info.event_count;
733                    yield PollResult::Ok {
734                        status: info.status,
735                        events,
736                        read_position: info.read_position,
737                        total_events: info.event_count,
738                        has_more,
739                    };
740                }
741                Err(e) => {
742                    yield PollResult::Err { message: e.to_string() };
743                }
744            }
745        }
746    }
747
748    /// List active streams
749    ///
750    /// Returns all active streams, optionally filtered by session.
751    #[plexus_macros::hub_method(
752        params(
753            session_id = "Optional: filter by session ID"
754        )
755    )]
756    async fn streams(
757        &self,
758        session_id: Option<ClaudeCodeId>,
759    ) -> impl Stream<Item = StreamListResult> + Send + 'static {
760        let storage = self.storage.clone();
761
762        stream! {
763            let streams = if let Some(sid) = session_id {
764                storage.stream_list_for_session(&sid).await
765            } else {
766                storage.stream_list().await
767            };
768
769            yield StreamListResult::Ok { streams };
770        }
771    }
772}
773
774// Background task implementation (outside the hub_methods block)
775impl<P: HubContext> ClaudeCode<P> {
776    /// Run chat in background, pushing events to stream buffer
777    async fn run_chat_background(
778        storage: Arc<ClaudeCodeStorage>,
779        executor: ClaudeCodeExecutor,
780        config: ClaudeCodeConfig,
781        prompt: String,
782        is_ephemeral: bool,
783        stream_id: StreamId,
784    ) {
785        let session_id = config.id;
786
787        // 1. Store user message
788        let user_msg = if is_ephemeral {
789            match storage.message_create_ephemeral(
790                &session_id,
791                MessageRole::User,
792                prompt.clone(),
793                None, None, None, None,
794            ).await {
795                Ok(m) => m,
796                Err(e) => {
797                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
798                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
799                    return;
800                }
801            }
802        } else {
803            match storage.message_create(
804                &session_id,
805                MessageRole::User,
806                prompt.clone(),
807                None, None, None, None,
808            ).await {
809                Ok(m) => m,
810                Err(e) => {
811                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
812                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
813                    return;
814                }
815            }
816        };
817
818        // 2. Create user node in Arbor
819        let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
820        let user_node_id = if is_ephemeral {
821            match storage.arbor().node_create_external_ephemeral(
822                &config.head.tree_id,
823                Some(config.head.node_id),
824                user_handle,
825                None,
826            ).await {
827                Ok(id) => id,
828                Err(e) => {
829                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
830                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
831                    return;
832                }
833            }
834        } else {
835            match storage.arbor().node_create_external(
836                &config.head.tree_id,
837                Some(config.head.node_id),
838                user_handle,
839                None,
840            ).await {
841                Ok(id) => id,
842                Err(e) => {
843                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
844                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
845                    return;
846                }
847            }
848        };
849
850        let user_position = Position::new(config.head.tree_id, user_node_id);
851
852        // Update stream with user position
853        let _ = storage.stream_set_user_position(&stream_id, user_position).await;
854
855        // 3. Push Start event
856        let _ = storage.stream_push_event(&stream_id, ChatEvent::Start {
857            id: session_id,
858            user_position,
859        }).await;
860
861        // 4. Build launch config
862        let launch_config = LaunchConfig {
863            query: prompt,
864            session_id: config.claude_session_id.clone(),
865            fork_session: false,
866            model: config.model,
867            working_dir: config.working_dir.clone(),
868            system_prompt: config.system_prompt.clone(),
869            mcp_config: config.mcp_config.clone(),
870            loopback_enabled: config.loopback_enabled,
871            loopback_session_id: if config.loopback_enabled {
872                Some(session_id.to_string())
873            } else {
874                None
875            },
876            ..Default::default()
877        };
878
879        // 5. Launch Claude and stream events to buffer
880        let mut response_content = String::new();
881        let mut claude_session_id = config.claude_session_id.clone();
882        let mut cost_usd = None;
883        let mut num_turns = None;
884
885        let mut raw_stream = executor.launch(launch_config).await;
886
887        // Track current tool use for streaming
888        let mut current_tool_id: Option<String> = None;
889        let mut current_tool_name: Option<String> = None;
890        let mut current_tool_input = String::new();
891
892        while let Some(event) = raw_stream.next().await {
893            match event {
894                RawClaudeEvent::System { session_id: sid, .. } => {
895                    if let Some(id) = sid {
896                        claude_session_id = Some(id);
897                    }
898                }
899                RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
900                    if let Some(id) = sid {
901                        claude_session_id = Some(id);
902                    }
903                    match inner {
904                        StreamEventInner::ContentBlockDelta { delta, .. } => {
905                            match delta {
906                                StreamDelta::TextDelta { text } => {
907                                    response_content.push_str(&text);
908                                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
909                                }
910                                StreamDelta::InputJsonDelta { partial_json } => {
911                                    current_tool_input.push_str(&partial_json);
912                                }
913                            }
914                        }
915                        StreamEventInner::ContentBlockStart { content_block, .. } => {
916                            if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
917                                current_tool_id = Some(id);
918                                current_tool_name = Some(name);
919                                current_tool_input.clear();
920                            }
921                        }
922                        StreamEventInner::ContentBlockStop { .. } => {
923                            if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
924                                let input: Value = serde_json::from_str(&current_tool_input)
925                                    .unwrap_or(Value::Object(serde_json::Map::new()));
926
927                                // Check if this is a loopback_permit call (tool waiting for approval)
928                                if name == "mcp__plexus__loopback_permit" {
929                                    let _ = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await;
930                                }
931
932                                let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
933                                    tool_name: name,
934                                    tool_use_id: id,
935                                    input,
936                                }).await;
937                                current_tool_input.clear();
938                            }
939                        }
940                        StreamEventInner::MessageDelta { delta } => {
941                            // If stop_reason is tool_use with loopback, mark as awaiting
942                            if delta.stop_reason == Some("tool_use".to_string()) {
943                                // Check if we're in loopback mode (already marked above)
944                            }
945                        }
946                        _ => {}
947                    }
948                }
949                RawClaudeEvent::Assistant { message } => {
950                    if let Some(msg) = message {
951                        if let Some(content) = msg.content {
952                            for block in content {
953                                match block {
954                                    RawContentBlock::Text { text } => {
955                                        if response_content.is_empty() {
956                                            response_content.push_str(&text);
957                                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
958                                        }
959                                    }
960                                    RawContentBlock::ToolUse { id, name, input } => {
961                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
962                                            tool_name: name,
963                                            tool_use_id: id,
964                                            input,
965                                        }).await;
966                                    }
967                                    RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
968                                        // Tool completed - back to running if was awaiting
969                                        let _ = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await;
970                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
971                                            tool_use_id,
972                                            output: content.unwrap_or_default(),
973                                            is_error: is_error.unwrap_or(false),
974                                        }).await;
975                                    }
976                                    RawContentBlock::Thinking { thinking, .. } => {
977                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await;
978                                    }
979                                }
980                            }
981                        }
982                    }
983                }
984                RawClaudeEvent::Result {
985                    session_id: sid,
986                    cost_usd: cost,
987                    num_turns: turns,
988                    is_error,
989                    error,
990                    ..
991                } => {
992                    if let Some(id) = sid {
993                        claude_session_id = Some(id);
994                    }
995                    cost_usd = cost;
996                    num_turns = turns;
997
998                    if is_error == Some(true) {
999                        if let Some(err_msg) = error {
1000                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await;
1001                            let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await;
1002                            return;
1003                        }
1004                    }
1005                }
1006                RawClaudeEvent::Unknown { event_type, data } => {
1007                    match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1008                        Ok(handle) => {
1009                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await;
1010                        }
1011                        Err(_) => {
1012                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1013                                event_type,
1014                                handle: "storage-failed".to_string(),
1015                                data,
1016                            }).await;
1017                        }
1018                    }
1019                }
1020                RawClaudeEvent::User { .. } => {}
1021            }
1022        }
1023
1024        // 6. Store assistant response
1025        let model_id = format!("claude-code-{}", config.model.as_str());
1026        let assistant_msg = if is_ephemeral {
1027            match storage.message_create_ephemeral(
1028                &session_id,
1029                MessageRole::Assistant,
1030                response_content,
1031                Some(model_id),
1032                None,
1033                None,
1034                cost_usd,
1035            ).await {
1036                Ok(m) => m,
1037                Err(e) => {
1038                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1039                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1040                    return;
1041                }
1042            }
1043        } else {
1044            match storage.message_create(
1045                &session_id,
1046                MessageRole::Assistant,
1047                response_content,
1048                Some(model_id),
1049                None,
1050                None,
1051                cost_usd,
1052            ).await {
1053                Ok(m) => m,
1054                Err(e) => {
1055                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1056                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1057                    return;
1058                }
1059            }
1060        };
1061
1062        // 7. Create assistant node in Arbor
1063        let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1064        let assistant_node_id = if is_ephemeral {
1065            match storage.arbor().node_create_external_ephemeral(
1066                &config.head.tree_id,
1067                Some(user_node_id),
1068                assistant_handle,
1069                None,
1070            ).await {
1071                Ok(id) => id,
1072                Err(e) => {
1073                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1074                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1075                    return;
1076                }
1077            }
1078        } else {
1079            match storage.arbor().node_create_external(
1080                &config.head.tree_id,
1081                Some(user_node_id),
1082                assistant_handle,
1083                None,
1084            ).await {
1085                Ok(id) => id,
1086                Err(e) => {
1087                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1088                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1089                    return;
1090                }
1091            }
1092        };
1093
1094        let new_head = Position::new(config.head.tree_id, assistant_node_id);
1095
1096        // 8. Update session head (skip for ephemeral)
1097        if !is_ephemeral {
1098            if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1099                let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1100                let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1101                return;
1102            }
1103        }
1104
1105        // 9. Push Complete event and mark stream as complete
1106        let _ = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1107            new_head: if is_ephemeral { config.head } else { new_head },
1108            claude_session_id: claude_session_id.unwrap_or_default(),
1109            usage: Some(ChatUsage {
1110                input_tokens: None,
1111                output_tokens: None,
1112                cost_usd,
1113                num_turns,
1114            }),
1115        }).await;
1116
1117        let _ = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await;
1118    }
1119}