Skip to main content

plexus_substrate/activations/claudecode/
activation.rs

1use super::{
2    executor::{ClaudeCodeExecutor, LaunchConfig},
3    sessions,
4    storage::ClaudeCodeStorage,
5    types::*,
6};
7use crate::activations::arbor::{NodeId, TreeId};
8use crate::plexus::{HubContext, NoParent};
9use async_stream::stream;
10use futures::{Stream, StreamExt};
11use plexus_macros::activation;
12use serde_json::Value;
13use std::marker::PhantomData;
14use std::sync::{Arc, OnceLock};
15use tracing::Instrument;
16
17/// ClaudeCode activation - manages Claude Code sessions with Arbor-backed history
18///
19/// Generic over `P: HubContext` to allow different parent contexts:
20/// - `Weak<DynamicHub>` when registered with a DynamicHub
21/// - Custom context types for sub-hubs
22/// - `NoParent` for standalone testing
23#[derive(Clone)]
24pub struct ClaudeCode<P: HubContext = NoParent> {
25    pub storage: Arc<ClaudeCodeStorage>,
26    executor: ClaudeCodeExecutor,
27    /// Hub reference for resolving foreign handles when walking arbor trees
28    hub: Arc<OnceLock<P>>,
29    _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> ClaudeCode<P> {
33    /// Create a new ClaudeCode with a specific parent context type
34    pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
35        Self {
36            storage,
37            executor: ClaudeCodeExecutor::new(),
38            hub: Arc::new(OnceLock::new()),
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Create with custom executor and parent context type
44    pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
45        Self {
46            storage,
47            executor,
48            hub: Arc::new(OnceLock::new()),
49            _phantom: PhantomData,
50        }
51    }
52
53    /// Inject parent context for resolving foreign handles
54    ///
55    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
56    /// This allows ClaudeCode to resolve handles from other activations when walking arbor trees.
57    pub fn inject_parent(&self, parent: P) {
58        let _ = self.hub.set(parent);
59    }
60
61    /// Check if parent context has been injected
62    pub fn has_parent(&self) -> bool {
63        self.hub.get().is_some()
64    }
65
66    /// Get a reference to the parent context
67    ///
68    /// Returns None if inject_parent hasn't been called yet.
69    pub fn parent(&self) -> Option<&P> {
70        self.hub.get()
71    }
72
73    /// Resolve a claudecode handle to its message content
74    ///
75    /// Called by the macro-generated resolve_handle method.
76    /// Handle format: {plugin_id}@1.0.0::chat:msg-{uuid}:{role}:{name}
77    pub async fn resolve_handle_impl(
78        &self,
79        handle: &crate::types::Handle,
80    ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
81        use crate::plexus::{PlexusError, wrap_stream};
82        use async_stream::stream;
83
84        let storage = self.storage.clone();
85
86        // Join meta parts into colon-separated identifier
87        // Format: "msg-{uuid}:{role}:{name}"
88        if handle.meta.is_empty() {
89            return Err(PlexusError::ExecutionError(
90                "ClaudeCode handle missing message ID in meta".to_string()
91            ));
92        }
93        let identifier = handle.meta.join(":");
94
95        // Extract name from meta if present (for response)
96        let name = handle.meta.get(2).cloned();
97
98        let result_stream = stream! {
99            match storage.resolve_message_handle(&identifier).await {
100                Ok(message) => {
101                    yield ResolveResult::Message {
102                        id: message.id.to_string(),
103                        role: message.role.as_str().to_string(),
104                        content: message.content,
105                        model: message.model_id,
106                        name: name.unwrap_or_else(|| message.role.as_str().to_string()),
107                    };
108                }
109                Err(e) => {
110                    yield ResolveResult::Error {
111                        message: format!("Failed to resolve handle: {}", e),
112                    };
113                }
114            }
115        };
116
117        Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
118    }
119}
120
121/// Convenience constructors for ClaudeCode with NoParent (standalone/testing)
122impl ClaudeCode<NoParent> {
123    pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
124        Self::with_context_type(storage)
125    }
126
127    pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
128        Self::with_executor_and_context(storage, executor)
129    }
130}
131
132// ═══════════════════════════════════════════════════════════════════════════
133// ARBOR EVENT CAPTURE HELPERS (Milestone 2)
134// ═══════════════════════════════════════════════════════════════════════════
135
136/// Create an arbor text node for a chat event
137async fn create_event_node(
138    arbor: &crate::activations::arbor::ArborStorage,
139    tree_id: &crate::activations::arbor::TreeId,
140    parent_id: &crate::activations::arbor::NodeId,
141    event: &NodeEvent,
142) -> Result<crate::activations::arbor::NodeId, String> {
143    let json = serde_json::to_string(event)
144        .map_err(|e| format!("Failed to serialize event: {}", e))?;
145
146    arbor.node_create_text(tree_id, Some(*parent_id), json, None)
147        .await
148        .map_err(|e| e.to_string())
149}
150
151#[plexus_macros::activation(namespace = "claudecode",
152version = "1.0.0",
153description = "Manage Claude Code sessions with Arbor-backed conversation history",
154resolve_handle, crate_path = "plexus_core")]
155impl<P: HubContext> ClaudeCode<P> {
156    /// Create a new Claude Code session
157    #[plexus_macros::method(params(
158        name = "Human-readable name for the session",
159        working_dir = "Working directory for Claude Code",
160        model = "Model to use (opus, sonnet, haiku)",
161        system_prompt = "Optional system prompt / instructions",
162        loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval",
163        loopback_session_id = "Session ID for loopback MCP URL correlation (e.g., orcha-xxx-claude-yyy)"
164    ))]
165    pub async fn create(
166        &self,
167        name: String,
168        working_dir: String,
169        model: Model,
170        system_prompt: Option<String>,
171        loopback_enabled: Option<bool>,
172        loopback_session_id: Option<String>,
173    ) -> impl Stream<Item = CreateResult> + Send + 'static {
174        let storage = self.storage.clone();
175        let loopback = loopback_enabled.unwrap_or(false);
176
177        stream! {
178            // Resolve relative paths to absolute before storing
179            let working_dir = match std::fs::canonicalize(&working_dir) {
180                Ok(p) => p.to_string_lossy().into_owned(),
181                Err(e) => {
182                    yield CreateResult::Err {
183                        message: ClaudeCodeError::PathResolution {
184                            path: working_dir,
185                            source: e,
186                        }.to_string(),
187                    };
188                    return;
189                }
190            };
191
192            // Fail fast: if loopback is requested, the MCP server must be reachable.
193            // Without it Claude cannot resolve the permission-prompt tool and will
194            // return empty output instead of an error.
195            if loopback {
196                if let Err(e) = super::executor::check_mcp_reachable().await {
197                    yield CreateResult::Err { message: e };
198                    return;
199                }
200            }
201
202            // claude_session_id is None initially; populated after first chat with real Claude UUID
203            match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None, loopback_session_id, None).await {
204                Ok(config) => {
205                    yield CreateResult::Ok {
206                        id: config.id,
207                        head: config.head,
208                    };
209                }
210                Err(e) => {
211                    yield CreateResult::Err { message: e.to_string() };
212                }
213            }
214        }
215    }
216
217    /// Chat with a session, streaming tokens like Cone
218    #[plexus_macros::method(streaming,
219    params(
220        name = "Session name to chat with",
221        prompt = "User message / prompt to send",
222        ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion",
223        allowed_tools = "Optional list of tools to allow (e.g. [\"WebSearch\", \"Read\"])"
224    ))]
225    pub async fn chat(
226        &self,
227        name: String,
228        prompt: String,
229        ephemeral: Option<bool>,
230        allowed_tools: Option<Vec<String>>,
231    ) -> impl Stream<Item = ChatEvent> + Send + 'static {
232        let storage = self.storage.clone();
233        let executor = self.executor.clone();
234
235        // Resolve before entering stream to avoid lifetime issues
236        let resolve_result = storage.session_get_by_name(&name).await;
237
238        stream! {
239            let is_ephemeral = ephemeral.unwrap_or(false);
240
241            // 1. Resolve and load session
242            let config = match resolve_result {
243                Ok(c) => c,
244                Err(e) => {
245                    yield ChatEvent::Err { message: e.to_string() };
246                    return;
247                }
248            };
249
250            let session_id = config.id;
251
252            // 2. Store user message in our database (ephemeral if requested)
253            let user_msg = if is_ephemeral {
254                match storage.message_create_ephemeral(
255                    &session_id,
256                    MessageRole::User,
257                    prompt.clone(),
258                    None, None, None, None,
259                ).await {
260                    Ok(m) => m,
261                    Err(e) => {
262                        yield ChatEvent::Err { message: e.to_string() };
263                        return;
264                    }
265                }
266            } else {
267                match storage.message_create(
268                    &session_id,
269                    MessageRole::User,
270                    prompt.clone(),
271                    None, None, None, None,
272                ).await {
273                    Ok(m) => m,
274                    Err(e) => {
275                        yield ChatEvent::Err { message: e.to_string() };
276                        return;
277                    }
278                }
279            };
280
281            // 3. Create user message node in Arbor (ephemeral if requested)
282            let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
283            let user_node_id = if is_ephemeral {
284                match storage.arbor().node_create_external_ephemeral(
285                    &config.head.tree_id,
286                    Some(config.head.node_id),
287                    user_handle,
288                    None,
289                ).await {
290                    Ok(id) => id,
291                    Err(e) => {
292                        yield ChatEvent::Err { message: e.to_string() };
293                        return;
294                    }
295                }
296            } else {
297                match storage.arbor().node_create_external(
298                    &config.head.tree_id,
299                    Some(config.head.node_id),
300                    user_handle,
301                    None,
302                ).await {
303                    Ok(id) => id,
304                    Err(e) => {
305                        yield ChatEvent::Err { message: e.to_string() };
306                        return;
307                    }
308                }
309            };
310
311            let user_position = Position::new(config.head.tree_id, user_node_id);
312
313            // Track current parent for event node chain (Milestone 2)
314            let mut current_parent = user_node_id;
315
316            // Create UserMessage event node (Milestone 2)
317            let user_event = NodeEvent::UserMessage { content: prompt.clone() };
318            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &user_event).await {
319                current_parent = node_id;
320            }
321
322            // Create AssistantStart event node (Milestone 2)
323            let start_event = NodeEvent::AssistantStart;
324            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &start_event).await {
325                current_parent = node_id;
326            }
327
328            // 4. Emit Start
329            yield ChatEvent::Start {
330                id: session_id,
331                user_position,
332            };
333
334            // 5. Build launch config
335            let launch_config = LaunchConfig {
336                query: prompt,
337                // Use stored Claude UUID for --resume (None on first call = new session)
338                session_id: config.claude_session_id.clone(),
339                fork_session: false,
340                model: config.model,
341                working_dir: config.working_dir.clone(),
342                system_prompt: config.system_prompt.clone(),
343                mcp_config: config.mcp_config.clone(),
344                loopback_enabled: config.loopback_enabled,
345                // Use loopback_session_id for MCP URL correlation (e.g., orcha-xxx-claude-yyy)
346                loopback_session_id: if config.loopback_enabled {
347                    config.loopback_session_id.clone()
348                } else {
349                    None
350                },
351                allowed_tools: allowed_tools.unwrap_or_default(),
352                ..Default::default()
353            };
354
355            // 6. Launch Claude and stream events
356            let prev_claude_session_id = config.claude_session_id.clone();
357            let mut response_content = String::new();
358            let mut claude_session_id = config.claude_session_id.clone();
359            let mut cost_usd = None;
360            let mut num_turns = None;
361
362            let mut raw_stream = executor.launch(launch_config).await;
363
364            // Track current tool use for streaming tool input
365            let mut current_tool_id: Option<String> = None;
366            let mut current_tool_name: Option<String> = None;
367            let mut current_tool_input = String::new();
368
369            while let Some(event) = raw_stream.next().await {
370                match event {
371                    RawClaudeEvent::System { session_id: sid, .. } => {
372                        if let Some(id) = sid {
373                            claude_session_id = Some(id);
374                        }
375                    }
376                    RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
377                        if let Some(id) = sid {
378                            claude_session_id = Some(id);
379                        }
380                        match inner {
381                            StreamEventInner::ContentBlockDelta { delta, .. } => {
382                                match delta {
383                                    StreamDelta::TextDelta { text } => {
384                                        response_content.push_str(&text);
385
386                                        // Create arbor node for text content (Milestone 2)
387                                        let event = NodeEvent::ContentText { text: text.clone() };
388                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
389                                            current_parent = node_id;
390                                        }
391
392                                        yield ChatEvent::Content { text };
393                                    }
394                                    StreamDelta::InputJsonDelta { partial_json } => {
395                                        current_tool_input.push_str(&partial_json);
396                                    }
397                                }
398                            }
399                            StreamEventInner::ContentBlockStart { content_block, .. } => {
400                                if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
401                                    current_tool_id = Some(id);
402                                    current_tool_name = Some(name);
403                                    current_tool_input.clear();
404                                }
405                            }
406                            StreamEventInner::ContentBlockStop { .. } => {
407                                // Emit tool use if we were building one
408                                if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
409                                    let input: Value = serde_json::from_str(&current_tool_input)
410                                        .unwrap_or(Value::Object(serde_json::Map::new()));
411
412                                    // Create arbor node for tool use (Milestone 2)
413                                    let event = NodeEvent::ContentToolUse {
414                                        id: id.clone(),
415                                        name: name.clone(),
416                                        input: input.clone(),
417                                    };
418                                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
419                                        current_parent = node_id;
420                                    }
421
422                                    yield ChatEvent::ToolUse {
423                                        tool_name: name,
424                                        tool_use_id: id,
425                                        input,
426                                    };
427                                    current_tool_input.clear();
428                                }
429                            }
430                            _ => {}
431                        }
432                    }
433                    RawClaudeEvent::Assistant { message } => {
434                        // Still handle non-streaming assistant messages (tool results, etc.)
435                        if let Some(msg) = message {
436                            if let Some(content) = msg.content {
437                                for block in content {
438                                    match block {
439                                        RawContentBlock::Text { text } => {
440                                            // Only emit if we haven't already streamed this
441                                            if response_content.is_empty() {
442                                                response_content.push_str(&text);
443
444                                                // Create arbor node for text content (Milestone 2)
445                                                let event = NodeEvent::ContentText { text: text.clone() };
446                                                if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
447                                                    current_parent = node_id;
448                                                }
449
450                                                yield ChatEvent::Content { text };
451                                            }
452                                        }
453                                        RawContentBlock::ToolUse { id, name, input } => {
454                                            // Create arbor node for tool use (Milestone 2)
455                                            let event = NodeEvent::ContentToolUse {
456                                                id: id.clone(),
457                                                name: name.clone(),
458                                                input: input.clone(),
459                                            };
460                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
461                                                current_parent = node_id;
462                                            }
463
464                                            yield ChatEvent::ToolUse {
465                                                tool_name: name,
466                                                tool_use_id: id,
467                                                input,
468                                            };
469                                        }
470                                        RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
471                                            // Create arbor node for tool result (Milestone 2)
472                                            let event = NodeEvent::UserToolResult {
473                                                tool_use_id: tool_use_id.clone(),
474                                                content: content.clone().unwrap_or_default(),
475                                                is_error: is_error.unwrap_or(false),
476                                            };
477                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
478                                                current_parent = node_id;
479                                            }
480
481                                            yield ChatEvent::ToolResult {
482                                                tool_use_id,
483                                                output: content.unwrap_or_default(),
484                                                is_error: is_error.unwrap_or(false),
485                                            };
486                                        }
487                                        RawContentBlock::Thinking { thinking, .. } => {
488                                            // Create arbor node for thinking (Milestone 2)
489                                            let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
490                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
491                                                current_parent = node_id;
492                                            }
493
494                                            yield ChatEvent::Thinking { thinking };
495                                        }
496                                    }
497                                }
498                            }
499                        }
500                    }
501                    RawClaudeEvent::Result {
502                        session_id: sid,
503                        cost_usd: cost,
504                        num_turns: turns,
505                        is_error,
506                        error,
507                        ..
508                    } => {
509                        if let Some(id) = sid {
510                            claude_session_id = Some(id);
511                        }
512                        cost_usd = cost;
513                        num_turns = turns;
514
515                        // Check for error
516                        if is_error == Some(true) {
517                            if let Some(err_msg) = error {
518                                yield ChatEvent::Err { message: err_msg };
519                                return;
520                            }
521                        }
522                    }
523                    RawClaudeEvent::Unknown { event_type, data } => {
524                        // Store unknown event and get handle
525                        match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
526                            Ok(handle) => {
527                                tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
528                                yield ChatEvent::Passthrough { event_type, handle, data };
529                            }
530                            Err(e) => {
531                                tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
532                                // Still forward the event even if storage fails
533                                yield ChatEvent::Passthrough {
534                                    event_type,
535                                    handle: "storage-failed".to_string(),
536                                    data,
537                                };
538                            }
539                        }
540                    }
541                    RawClaudeEvent::User { .. } => {
542                        // User events are echoed back but we don't need to process them
543                    }
544                    RawClaudeEvent::LaunchCommand { command } => {
545                        let event = NodeEvent::LaunchCommand { command };
546                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
547                            current_parent = node_id;
548                        }
549                    }
550                    RawClaudeEvent::Stderr { text } => {
551                        tracing::warn!(stderr = %text, "Claude stderr");
552                        let event = NodeEvent::ClaudeStderr { text };
553                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
554                            current_parent = node_id;
555                        }
556                    }
557                }
558            }
559
560            // 6b. If we captured a new Claude session UUID, persist it for future --resume
561            if let Some(ref new_id) = claude_session_id {
562                if prev_claude_session_id.as_deref() != Some(new_id.as_str()) {
563                    let _ = storage.session_update_claude_id(&session_id, new_id.clone()).await;
564                }
565            }
566
567            // Guard: if stream produced nothing, emit error instead of ghost Complete
568            if response_content.is_empty() && claude_session_id.is_none() {
569                yield ChatEvent::Err {
570                    message: "Claude process produced no response. Check substrate logs for details.".to_string(),
571                };
572                return;
573            }
574
575            // 7. Store assistant response (ephemeral if requested)
576            let model_id = format!("claude-code-{}", config.model.as_str());
577            let assistant_msg = if is_ephemeral {
578                match storage.message_create_ephemeral(
579                    &session_id,
580                    MessageRole::Assistant,
581                    response_content,
582                    Some(model_id),
583                    None,
584                    None,
585                    cost_usd,
586                ).await {
587                    Ok(m) => m,
588                    Err(e) => {
589                        yield ChatEvent::Err { message: e.to_string() };
590                        return;
591                    }
592                }
593            } else {
594                match storage.message_create(
595                    &session_id,
596                    MessageRole::Assistant,
597                    response_content,
598                    Some(model_id),
599                    None,
600                    None,
601                    cost_usd,
602                ).await {
603                    Ok(m) => m,
604                    Err(e) => {
605                        yield ChatEvent::Err { message: e.to_string() };
606                        return;
607                    }
608                }
609            };
610
611            // Create AssistantComplete event node (Milestone 2)
612            let complete_event = NodeEvent::AssistantComplete { usage: None };
613            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &complete_event).await {
614                current_parent = node_id;
615            }
616
617            // 8. Create assistant node in Arbor (ephemeral if requested)
618            let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
619            let assistant_node_id = if is_ephemeral {
620                match storage.arbor().node_create_external_ephemeral(
621                    &config.head.tree_id,
622                    Some(current_parent),
623                    assistant_handle,
624                    None,
625                ).await {
626                    Ok(id) => id,
627                    Err(e) => {
628                        yield ChatEvent::Err { message: e.to_string() };
629                        return;
630                    }
631                }
632            } else {
633                match storage.arbor().node_create_external(
634                    &config.head.tree_id,
635                    Some(current_parent),
636                    assistant_handle,
637                    None,
638                ).await {
639                    Ok(id) => id,
640                    Err(e) => {
641                        yield ChatEvent::Err { message: e.to_string() };
642                        return;
643                    }
644                }
645            };
646
647            let new_head = Position::new(config.head.tree_id, assistant_node_id);
648
649            // 9. Update session head and Claude session ID (skip for ephemeral)
650            if !is_ephemeral {
651                if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
652                    yield ChatEvent::Err { message: e.to_string() };
653                    return;
654                }
655            }
656
657            // 10. Emit Complete
658            // For ephemeral, new_head points to the ephemeral node (not the session's actual head)
659            yield ChatEvent::Complete {
660                new_head: if is_ephemeral { config.head } else { new_head },
661                claude_session_id: claude_session_id.unwrap_or_default(),
662                usage: Some(ChatUsage {
663                    input_tokens: None,
664                    output_tokens: None,
665                    cost_usd,
666                    num_turns,
667                }),
668            };
669        }
670    }
671
672    /// Get session configuration details
673    #[plexus_macros::method]
674    async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
675        let result = self.storage.session_get_by_name(&name).await;
676
677        stream! {
678            match result {
679                Ok(config) => {
680                    yield GetResult::Ok { config };
681                }
682                Err(e) => {
683                    yield GetResult::Err { message: e.to_string() };
684                }
685            }
686        }
687    }
688
689    /// List all Claude Code sessions
690    #[plexus_macros::method]
691    async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
692        let storage = self.storage.clone();
693
694        stream! {
695            match storage.session_list().await {
696                Ok(sessions) => {
697                    yield ListResult::Ok { sessions };
698                }
699                Err(e) => {
700                    yield ListResult::Err { message: e.to_string() };
701                }
702            }
703        }
704    }
705
706    /// Delete a session
707    #[plexus_macros::method]
708    async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
709        let storage = self.storage.clone();
710        let resolve_result = storage.session_get_by_name(&name).await;
711
712        stream! {
713            let config = match resolve_result {
714                Ok(c) => c,
715                Err(e) => {
716                    yield DeleteResult::Err { message: e.to_string() };
717                    return;
718                }
719            };
720
721            match storage.session_delete(&config.id).await {
722                Ok(_) => {
723                    yield DeleteResult::Ok { id: config.id };
724                }
725                Err(e) => {
726                    yield DeleteResult::Err { message: e.to_string() };
727                }
728            }
729        }
730    }
731
732    /// Fork a session to create a branch point
733    #[plexus_macros::method]
734    async fn fork(
735        &self,
736        name: String,
737        new_name: String,
738    ) -> impl Stream<Item = ForkResult> + Send + 'static {
739        let storage = self.storage.clone();
740        let resolve_result = storage.session_get_by_name(&name).await;
741
742        stream! {
743            // Get parent session
744            let parent = match resolve_result {
745                Ok(c) => c,
746                Err(e) => {
747                    yield ForkResult::Err { message: e.to_string() };
748                    return;
749                }
750            };
751
752            // Create new session starting at parent's head position
753            // The new session will fork Claude's session on first chat
754            let new_config = match storage.session_create(
755                new_name,
756                parent.working_dir.clone(),
757                parent.model,
758                parent.system_prompt.clone(),
759                parent.mcp_config.clone(),
760                parent.loopback_enabled,
761                None, // claude_session_id - will be set on first chat with fork_session=true
762                None, // loopback_session_id
763                None, // metadata
764            ).await {
765                Ok(mut c) => {
766                    // Update head to parent's position (share the same tree point)
767                    // This creates a branch - the new session diverges from here
768                    if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
769                        yield ForkResult::Err { message: e.to_string() };
770                        return;
771                    }
772                    c.head = parent.head;
773                    c
774                }
775                Err(e) => {
776                    yield ForkResult::Err { message: e.to_string() };
777                    return;
778                }
779            };
780
781            yield ForkResult::Ok {
782                id: new_config.id,
783                head: new_config.head,
784            };
785        }
786    }
787
788    /// Start an async chat - returns immediately with stream_id for polling
789    ///
790    /// This is the non-blocking version of chat, designed for loopback scenarios
791    /// where the parent needs to poll for events and handle tool approvals.
792    #[plexus_macros::method(params(
793        name = "Session name to chat with",
794        prompt = "User message / prompt to send",
795        ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
796    ))]
797    async fn chat_async(
798        &self,
799        name: String,
800        prompt: String,
801        ephemeral: Option<bool>,
802    ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
803        let storage = self.storage.clone();
804        let executor = self.executor.clone();
805
806        // Resolve session before entering stream
807        let resolve_result = storage.session_get_by_name(&name).await;
808
809        stream! {
810            let is_ephemeral = ephemeral.unwrap_or(false);
811
812            // 1. Resolve session
813            let config = match resolve_result {
814                Ok(c) => c,
815                Err(e) => {
816                    yield ChatStartResult::Err { message: e.to_string() };
817                    return;
818                }
819            };
820
821            let session_id = config.id;
822
823            // 2. Create stream buffer
824            let stream_id = match storage.stream_create(session_id).await {
825                Ok(id) => id,
826                Err(e) => {
827                    yield ChatStartResult::Err { message: e.to_string() };
828                    return;
829                }
830            };
831
832            // 3. Spawn background task to run the chat
833            let storage_bg = storage.clone();
834            let executor_bg = executor.clone();
835            let prompt_bg = prompt.clone();
836            let config_bg = config.clone();
837            let stream_id_bg = stream_id;
838
839            tokio::spawn(async move {
840                Self::run_chat_background(
841                    storage_bg,
842                    executor_bg,
843                    config_bg,
844                    prompt_bg,
845                    is_ephemeral,
846                    stream_id_bg,
847                ).await;
848            }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
849
850            // 4. Return immediately with stream_id
851            yield ChatStartResult::Ok {
852                stream_id,
853                session_id,
854            };
855        }
856    }
857
858    /// Poll a stream for new events
859    ///
860    /// Returns events since the last poll (or from the specified offset).
861    /// Use this to read events from an async chat started with chat_async.
862    #[plexus_macros::method(params(
863        stream_id = "Stream ID returned from chat_async",
864        from_seq = "Optional: start reading from this sequence number",
865        limit = "Optional: max events to return (default 100)"
866    ))]
867    async fn poll(
868        &self,
869        stream_id: StreamId,
870        from_seq: Option<u64>,
871        limit: Option<u64>,
872    ) -> impl Stream<Item = PollResult> + Send + 'static {
873        let storage = self.storage.clone();
874
875        stream! {
876            let limit_usize = limit.map(|l| l as usize);
877
878            match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
879                Ok((info, events)) => {
880                    let has_more = info.read_position < info.event_count;
881                    yield PollResult::Ok {
882                        status: info.status,
883                        events,
884                        read_position: info.read_position,
885                        total_events: info.event_count,
886                        has_more,
887                    };
888                }
889                Err(e) => {
890                    yield PollResult::Err { message: e.to_string() };
891                }
892            }
893        }
894    }
895
896    /// List active streams
897    ///
898    /// Returns all active streams, optionally filtered by session.
899    #[plexus_macros::method(params(
900        session_id = "Optional: filter by session ID"
901    ))]
902    async fn streams(
903        &self,
904        session_id: Option<ClaudeCodeId>,
905    ) -> impl Stream<Item = StreamListResult> + Send + 'static {
906        let storage = self.storage.clone();
907
908        stream! {
909            let streams = if let Some(sid) = session_id {
910                storage.stream_list_for_session(&sid).await
911            } else {
912                storage.stream_list().await
913            };
914
915            yield StreamListResult::Ok { streams };
916        }
917    }
918
919    /// Get arbor tree information for a session
920    #[plexus_macros::method(params(
921        name = "Session name"
922    ))]
923    async fn get_tree(
924        &self,
925        name: String,
926    ) -> impl Stream<Item = GetTreeResult> + Send + 'static {
927        let storage = self.storage.clone();
928
929        stream! {
930            let config = match storage.session_get_by_name(&name).await {
931                Ok(c) => c,
932                Err(e) => {
933                    yield GetTreeResult::Err { message: e.to_string() };
934                    return;
935                }
936            };
937
938            yield GetTreeResult::Ok {
939                tree_id: config.head.tree_id,
940                head: config.head.node_id,
941            };
942        }
943    }
944
945    /// Render arbor tree as Claude API messages
946    #[plexus_macros::method(params(
947        name = "Session name",
948        start = "Optional start node (default: root)",
949        end = "Optional end node (default: head)"
950    ))]
951    async fn render_context(
952        &self,
953        name: String,
954        start: Option<NodeId>,
955        end: Option<NodeId>,
956    ) -> impl Stream<Item = RenderResult> + Send + 'static {
957        let storage = self.storage.clone();
958
959        stream! {
960            // Get session config
961            let config = match storage.session_get_by_name(&name).await {
962                Ok(c) => c,
963                Err(e) => {
964                    yield RenderResult::Err { message: e.to_string() };
965                    return;
966                }
967            };
968
969            let tree_id = config.head.tree_id;
970            let end_node = end.unwrap_or(config.head.node_id);
971
972            // Get root if start not specified
973            let start_node = if let Some(s) = start {
974                s
975            } else {
976                match storage.arbor().tree_get(&tree_id).await {
977                    Ok(tree) => tree.root,
978                    Err(e) => {
979                        yield RenderResult::Err { message: e.to_string() };
980                        return;
981                    }
982                }
983            };
984
985            // Render messages
986            let messages = match storage.render_messages(&tree_id, &start_node, &end_node).await {
987                Ok(m) => m,
988                Err(e) => {
989                    yield RenderResult::Err { message: e.to_string() };
990                    return;
991                }
992            };
993
994            yield RenderResult::Ok { messages };
995        }
996    }
997
998    /// List all session files for a project
999    #[plexus_macros::method(params(
1000        project_path = "Project path (e.g., '-workspace-hypermemetic-hub-codegen')"
1001    ))]
1002    async fn sessions_list(
1003        &self,
1004        project_path: String,
1005    ) -> impl Stream<Item = SessionsListResult> + Send + 'static {
1006        stream! {
1007            match sessions::list_sessions(&project_path).await {
1008                Ok(sessions) => {
1009                    yield SessionsListResult::Ok { sessions };
1010                }
1011                Err(e) => {
1012                    yield SessionsListResult::Err { message: e };
1013                }
1014            }
1015        }
1016    }
1017
1018    /// Get events from a session file
1019    #[plexus_macros::method(params(
1020        project_path = "Project path",
1021        session_id = "Session ID (UUID)"
1022    ))]
1023    async fn sessions_get(
1024        &self,
1025        project_path: String,
1026        session_id: String,
1027    ) -> impl Stream<Item = SessionsGetResult> + Send + 'static {
1028        stream! {
1029            match sessions::read_session(&project_path, &session_id).await {
1030                Ok(events) => {
1031                    let event_count = events.len();
1032                    // Convert to JSON values for transport
1033                    let events_json: Vec<serde_json::Value> = events.into_iter()
1034                        .filter_map(|e| serde_json::to_value(e).ok())
1035                        .collect();
1036
1037                    yield SessionsGetResult::Ok {
1038                        session_id: session_id.clone(),
1039                        event_count,
1040                        events: events_json,
1041                    };
1042                }
1043                Err(e) => {
1044                    yield SessionsGetResult::Err { message: e };
1045                }
1046            }
1047        }
1048    }
1049
1050    /// Import a session file into arbor
1051    #[plexus_macros::method(params(
1052        project_path = "Project path",
1053        session_id = "Session ID to import",
1054        owner_id = "Owner ID for the new tree (default: 'claudecode')"
1055    ))]
1056    async fn sessions_import(
1057        &self,
1058        project_path: String,
1059        session_id: String,
1060        owner_id: Option<String>,
1061    ) -> impl Stream<Item = SessionsImportResult> + Send + 'static {
1062        let storage = self.storage.clone();
1063
1064        stream! {
1065            let owner = owner_id.unwrap_or_else(|| "claudecode".to_string());
1066
1067            match sessions::import_to_arbor(storage.arbor(), &project_path, &session_id, &owner).await {
1068                Ok(tree_id) => {
1069                    yield SessionsImportResult::Ok {
1070                        tree_id,
1071                        session_id,
1072                    };
1073                }
1074                Err(e) => {
1075                    yield SessionsImportResult::Err { message: e };
1076                }
1077            }
1078        }
1079    }
1080
1081    /// Export an arbor tree to a session file
1082    #[plexus_macros::method(params(
1083        tree_id = "Arbor tree ID to export",
1084        project_path = "Project path",
1085        session_id = "Session ID for the exported file"
1086    ))]
1087    async fn sessions_export(
1088        &self,
1089        tree_id: TreeId,
1090        project_path: String,
1091        session_id: String,
1092    ) -> impl Stream<Item = SessionsExportResult> + Send + 'static {
1093        let storage = self.storage.clone();
1094
1095        stream! {
1096            match sessions::export_from_arbor(storage.arbor(), &tree_id, &project_path, &session_id).await {
1097                Ok(()) => {
1098                    yield SessionsExportResult::Ok {
1099                        tree_id,
1100                        session_id,
1101                    };
1102                }
1103                Err(e) => {
1104                    yield SessionsExportResult::Err { message: e };
1105                }
1106            }
1107        }
1108    }
1109
1110    /// Delete a session file
1111    #[plexus_macros::method(params(
1112        project_path = "Project path",
1113        session_id = "Session ID to delete"
1114    ))]
1115    async fn sessions_delete(
1116        &self,
1117        project_path: String,
1118        session_id: String,
1119    ) -> impl Stream<Item = SessionsDeleteResult> + Send + 'static {
1120        stream! {
1121            match sessions::delete_session(&project_path, &session_id).await {
1122                Ok(()) => {
1123                    yield SessionsDeleteResult::Ok {
1124                        session_id,
1125                        deleted: true,
1126                    };
1127                }
1128                Err(e) => {
1129                    yield SessionsDeleteResult::Err { message: e };
1130                }
1131            }
1132        }
1133    }
1134}
1135
1136// Background task implementation (outside the hub_methods block)
1137impl<P: HubContext> ClaudeCode<P> {
1138    /// Run chat in background, pushing events to stream buffer
1139    async fn run_chat_background(
1140        storage: Arc<ClaudeCodeStorage>,
1141        executor: ClaudeCodeExecutor,
1142        config: ClaudeCodeConfig,
1143        prompt: String,
1144        is_ephemeral: bool,
1145        stream_id: StreamId,
1146    ) {
1147        let session_id = config.id;
1148
1149        // 1. Store user message
1150        let user_msg = if is_ephemeral {
1151            match storage.message_create_ephemeral(
1152                &session_id,
1153                MessageRole::User,
1154                prompt.clone(),
1155                None, None, None, None,
1156            ).await {
1157                Ok(m) => m,
1158                Err(e) => {
1159                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1160                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1161                    }
1162                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1163                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1164                    }
1165                    return;
1166                }
1167            }
1168        } else {
1169            match storage.message_create(
1170                &session_id,
1171                MessageRole::User,
1172                prompt.clone(),
1173                None, None, None, None,
1174            ).await {
1175                Ok(m) => m,
1176                Err(e) => {
1177                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1178                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1179                    }
1180                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1181                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1182                    }
1183                    return;
1184                }
1185            }
1186        };
1187
1188        // 2. Create user node in Arbor
1189        let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
1190        let user_node_id = if is_ephemeral {
1191            match storage.arbor().node_create_external_ephemeral(
1192                &config.head.tree_id,
1193                Some(config.head.node_id),
1194                user_handle,
1195                None,
1196            ).await {
1197                Ok(id) => id,
1198                Err(e) => {
1199                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1200                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1201                    }
1202                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1203                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1204                    }
1205                    return;
1206                }
1207            }
1208        } else {
1209            match storage.arbor().node_create_external(
1210                &config.head.tree_id,
1211                Some(config.head.node_id),
1212                user_handle,
1213                None,
1214            ).await {
1215                Ok(id) => id,
1216                Err(e) => {
1217                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1218                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1219                    }
1220                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1221                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1222                    }
1223                    return;
1224                }
1225            }
1226        };
1227
1228        let user_position = Position::new(config.head.tree_id, user_node_id);
1229
1230        // Track current parent for event node chain (Milestone 2)
1231        let mut current_parent = user_node_id;
1232
1233        // Create UserMessage event node (Milestone 2)
1234        let user_event = NodeEvent::UserMessage { content: prompt.clone() };
1235        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &user_event).await {
1236            current_parent = node_id;
1237        }
1238
1239        // Create AssistantStart event node (Milestone 2)
1240        let start_event = NodeEvent::AssistantStart;
1241        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &start_event).await {
1242            current_parent = node_id;
1243        }
1244
1245        // Update stream with user position
1246        if let Err(e) = storage.stream_set_user_position(&stream_id, user_position).await {
1247            tracing::error!(stream_id = %stream_id, error = %e, "Failed to set user position on stream");
1248        }
1249
1250        // 3. Push Start event
1251        if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Start {
1252            id: session_id,
1253            user_position,
1254        }).await {
1255            tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1256        }
1257
1258        // 4. Build launch config
1259        let launch_config = LaunchConfig {
1260            query: prompt,
1261            session_id: config.claude_session_id.clone(),
1262            fork_session: false,
1263            model: config.model,
1264            working_dir: config.working_dir.clone(),
1265            system_prompt: config.system_prompt.clone(),
1266            mcp_config: config.mcp_config.clone(),
1267            loopback_enabled: config.loopback_enabled,
1268            // Use claude_session_id for MCP URL transparency (e.g., orcha-xxx)
1269            loopback_session_id: if config.loopback_enabled {
1270                config.claude_session_id.clone()
1271            } else {
1272                None
1273            },
1274            ..Default::default()
1275        };
1276
1277        // 5. Launch Claude and stream events to buffer
1278        let mut response_content = String::new();
1279        let mut claude_session_id = config.claude_session_id.clone();
1280        let mut cost_usd = None;
1281        let mut num_turns = None;
1282
1283        let mut raw_stream = executor.launch(launch_config).await;
1284
1285        // Track current tool use for streaming
1286        let mut current_tool_id: Option<String> = None;
1287        let mut current_tool_name: Option<String> = None;
1288        let mut current_tool_input = String::new();
1289
1290        while let Some(event) = raw_stream.next().await {
1291            match event {
1292                RawClaudeEvent::System { session_id: sid, .. } => {
1293                    if let Some(id) = sid {
1294                        claude_session_id = Some(id);
1295                    }
1296                }
1297                RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
1298                    if let Some(id) = sid {
1299                        claude_session_id = Some(id);
1300                    }
1301                    match inner {
1302                        StreamEventInner::ContentBlockDelta { delta, .. } => {
1303                            match delta {
1304                                StreamDelta::TextDelta { text } => {
1305                                    response_content.push_str(&text);
1306
1307                                    // Create arbor node for text content (Milestone 2)
1308                                    let event = NodeEvent::ContentText { text: text.clone() };
1309                                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1310                                        current_parent = node_id;
1311                                    }
1312
1313                                    if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await {
1314                                        tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1315                                    }
1316                                }
1317                                StreamDelta::InputJsonDelta { partial_json } => {
1318                                    current_tool_input.push_str(&partial_json);
1319                                }
1320                            }
1321                        }
1322                        StreamEventInner::ContentBlockStart { content_block, .. } => {
1323                            if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
1324                                current_tool_id = Some(id);
1325                                current_tool_name = Some(name);
1326                                current_tool_input.clear();
1327                            }
1328                        }
1329                        StreamEventInner::ContentBlockStop { .. } => {
1330                            if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
1331                                let input: Value = serde_json::from_str(&current_tool_input)
1332                                    .unwrap_or(Value::Object(serde_json::Map::new()));
1333
1334                                // Check if this is a loopback_permit call (tool waiting for approval)
1335                                if name == "mcp__plexus__loopback_permit" {
1336                                    if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await {
1337                                        tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1338                                    }
1339                                }
1340
1341                                // Create arbor node for tool use (Milestone 2)
1342                                let event = NodeEvent::ContentToolUse {
1343                                    id: id.clone(),
1344                                    name: name.clone(),
1345                                    input: input.clone(),
1346                                };
1347                                if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1348                                    current_parent = node_id;
1349                                }
1350
1351                                if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1352                                    tool_name: name,
1353                                    tool_use_id: id,
1354                                    input,
1355                                }).await {
1356                                    tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1357                                }
1358                                current_tool_input.clear();
1359                            }
1360                        }
1361                        StreamEventInner::MessageDelta { delta } => {
1362                            // If stop_reason is tool_use with loopback, mark as awaiting
1363                            if delta.stop_reason == Some("tool_use".to_string()) {
1364                                // Check if we're in loopback mode (already marked above)
1365                            }
1366                        }
1367                        _ => {}
1368                    }
1369                }
1370                RawClaudeEvent::Assistant { message } => {
1371                    if let Some(msg) = message {
1372                        if let Some(content) = msg.content {
1373                            for block in content {
1374                                match block {
1375                                    RawContentBlock::Text { text } => {
1376                                        if response_content.is_empty() {
1377                                            response_content.push_str(&text);
1378
1379                                            // Create arbor node for text content (Milestone 2)
1380                                            let event = NodeEvent::ContentText { text: text.clone() };
1381                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1382                                                current_parent = node_id;
1383                                            }
1384
1385                                            if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await {
1386                                                tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1387                                            }
1388                                        }
1389                                    }
1390                                    RawContentBlock::ToolUse { id, name, input } => {
1391                                        // Create arbor node for tool use (Milestone 2)
1392                                        let event = NodeEvent::ContentToolUse {
1393                                            id: id.clone(),
1394                                            name: name.clone(),
1395                                            input: input.clone(),
1396                                        };
1397                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1398                                            current_parent = node_id;
1399                                        }
1400
1401                                        if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1402                                            tool_name: name,
1403                                            tool_use_id: id,
1404                                            input,
1405                                        }).await {
1406                                            tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1407                                        }
1408                                    }
1409                                    RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
1410                                        // Tool completed - back to running if was awaiting
1411                                        if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await {
1412                                            tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1413                                        }
1414
1415                                        // Create arbor node for tool result (Milestone 2)
1416                                        let event = NodeEvent::UserToolResult {
1417                                            tool_use_id: tool_use_id.clone(),
1418                                            content: content.clone().unwrap_or_default(),
1419                                            is_error: is_error.unwrap_or(false),
1420                                        };
1421                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1422                                            current_parent = node_id;
1423                                        }
1424
1425                                        if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
1426                                            tool_use_id,
1427                                            output: content.unwrap_or_default(),
1428                                            is_error: is_error.unwrap_or(false),
1429                                        }).await {
1430                                            tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1431                                        }
1432                                    }
1433                                    RawContentBlock::Thinking { thinking, .. } => {
1434                                        // Create arbor node for thinking (Milestone 2)
1435                                        let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
1436                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1437                                            current_parent = node_id;
1438                                        }
1439
1440                                        if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await {
1441                                            tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1442                                        }
1443                                    }
1444                                }
1445                            }
1446                        }
1447                    }
1448                }
1449                RawClaudeEvent::Result {
1450                    session_id: sid,
1451                    cost_usd: cost,
1452                    num_turns: turns,
1453                    is_error,
1454                    error,
1455                    ..
1456                } => {
1457                    if let Some(id) = sid {
1458                        claude_session_id = Some(id);
1459                    }
1460                    cost_usd = cost;
1461                    num_turns = turns;
1462
1463                    if is_error == Some(true) {
1464                        if let Some(err_msg) = error {
1465                            if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await {
1466                                tracing::error!(stream_id = %stream_id, error = %e, "Failed to push error event to stream");
1467                            }
1468                            if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await {
1469                                tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status to Failed");
1470                            }
1471                            return;
1472                        }
1473                    }
1474                }
1475                RawClaudeEvent::Unknown { event_type, data } => {
1476                    match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1477                        Ok(handle) => {
1478                            if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await {
1479                                tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1480                            }
1481                        }
1482                        Err(_) => {
1483                            if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1484                                event_type,
1485                                handle: "storage-failed".to_string(),
1486                                data,
1487                            }).await {
1488                                tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1489                            }
1490                        }
1491                    }
1492                }
1493                RawClaudeEvent::User { .. } => {}
1494                RawClaudeEvent::LaunchCommand { command } => {
1495                    let event = NodeEvent::LaunchCommand { command };
1496                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1497                        current_parent = node_id;
1498                    }
1499                }
1500                RawClaudeEvent::Stderr { text } => {
1501                    tracing::warn!(stderr = %text, "Claude stderr");
1502                    let event = NodeEvent::ClaudeStderr { text };
1503                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1504                        current_parent = node_id;
1505                    }
1506                }
1507            }
1508        }
1509
1510        // 6. Store assistant response
1511        let model_id = format!("claude-code-{}", config.model.as_str());
1512        let assistant_msg = if is_ephemeral {
1513            match storage.message_create_ephemeral(
1514                &session_id,
1515                MessageRole::Assistant,
1516                response_content,
1517                Some(model_id),
1518                None,
1519                None,
1520                cost_usd,
1521            ).await {
1522                Ok(m) => m,
1523                Err(e) => {
1524                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1525                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1526                    }
1527                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1528                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1529                    }
1530                    return;
1531                }
1532            }
1533        } else {
1534            match storage.message_create(
1535                &session_id,
1536                MessageRole::Assistant,
1537                response_content,
1538                Some(model_id),
1539                None,
1540                None,
1541                cost_usd,
1542            ).await {
1543                Ok(m) => m,
1544                Err(e) => {
1545                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1546                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1547                    }
1548                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1549                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1550                    }
1551                    return;
1552                }
1553            }
1554        };
1555
1556        // Create AssistantComplete event node (Milestone 2)
1557        let complete_event = NodeEvent::AssistantComplete { usage: None };
1558        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &complete_event).await {
1559            current_parent = node_id;
1560        }
1561
1562        // 7. Create assistant node in Arbor
1563        let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1564        let assistant_node_id = if is_ephemeral {
1565            match storage.arbor().node_create_external_ephemeral(
1566                &config.head.tree_id,
1567                Some(current_parent),
1568                assistant_handle,
1569                None,
1570            ).await {
1571                Ok(id) => id,
1572                Err(e) => {
1573                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1574                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1575                    }
1576                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1577                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1578                    }
1579                    return;
1580                }
1581            }
1582        } else {
1583            match storage.arbor().node_create_external(
1584                &config.head.tree_id,
1585                Some(current_parent),
1586                assistant_handle,
1587                None,
1588            ).await {
1589                Ok(id) => id,
1590                Err(e) => {
1591                    if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1592                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1593                    }
1594                    if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1595                        tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1596                    }
1597                    return;
1598                }
1599            }
1600        };
1601
1602        let new_head = Position::new(config.head.tree_id, assistant_node_id);
1603
1604        // 8. Update session head (skip for ephemeral)
1605        if !is_ephemeral {
1606            if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1607                if let Err(e2) = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await {
1608                    tracing::error!(stream_id = %stream_id, error = %e2, "Failed to push error event to stream");
1609                }
1610                if let Err(e2) = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await {
1611                    tracing::error!(stream_id = %stream_id, error = %e2, "Failed to update stream status to Failed");
1612                }
1613                return;
1614            }
1615        }
1616
1617        // 9. Push Complete event and mark stream as complete
1618        if let Err(e) = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1619            new_head: if is_ephemeral { config.head } else { new_head },
1620            claude_session_id: claude_session_id.unwrap_or_default(),
1621            usage: Some(ChatUsage {
1622                input_tokens: None,
1623                output_tokens: None,
1624                cost_usd,
1625                num_turns,
1626            }),
1627        }).await {
1628            tracing::error!(stream_id = %stream_id, error = %e, "Failed to push event to stream");
1629        }
1630
1631        if let Err(e) = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await {
1632            tracing::error!(stream_id = %stream_id, error = %e, "Failed to update stream status");
1633        }
1634    }
1635}