Skip to main content

plexus_substrate/activations/claudecode/
activation.rs

1use super::{
2    executor::{ClaudeCodeExecutor, LaunchConfig},
3    sessions,
4    storage::ClaudeCodeStorage,
5    types::*,
6};
7use crate::activations::arbor::{NodeId, TreeId};
8use crate::plexus::{HubContext, NoParent};
9use async_stream::stream;
10use futures::{Stream, StreamExt};
11use plexus_macros::hub_methods;
12use serde_json::Value;
13use std::marker::PhantomData;
14use std::sync::{Arc, OnceLock};
15use tracing::Instrument;
16
17/// ClaudeCode activation - manages Claude Code sessions with Arbor-backed history
18///
19/// Generic over `P: HubContext` to allow different parent contexts:
20/// - `Weak<DynamicHub>` when registered with a DynamicHub
21/// - Custom context types for sub-hubs
22/// - `NoParent` for standalone testing
23#[derive(Clone)]
24pub struct ClaudeCode<P: HubContext = NoParent> {
25    pub storage: Arc<ClaudeCodeStorage>,
26    executor: ClaudeCodeExecutor,
27    /// Hub reference for resolving foreign handles when walking arbor trees
28    hub: Arc<OnceLock<P>>,
29    _phantom: PhantomData<P>,
30}
31
32impl<P: HubContext> ClaudeCode<P> {
33    /// Create a new ClaudeCode with a specific parent context type
34    pub fn with_context_type(storage: Arc<ClaudeCodeStorage>) -> Self {
35        Self {
36            storage,
37            executor: ClaudeCodeExecutor::new(),
38            hub: Arc::new(OnceLock::new()),
39            _phantom: PhantomData,
40        }
41    }
42
43    /// Create with custom executor and parent context type
44    pub fn with_executor_and_context(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
45        Self {
46            storage,
47            executor,
48            hub: Arc::new(OnceLock::new()),
49            _phantom: PhantomData,
50        }
51    }
52
53    /// Inject parent context for resolving foreign handles
54    ///
55    /// Called during hub construction (e.g., via Arc::new_cyclic for DynamicHub).
56    /// This allows ClaudeCode to resolve handles from other activations when walking arbor trees.
57    pub fn inject_parent(&self, parent: P) {
58        let _ = self.hub.set(parent);
59    }
60
61    /// Check if parent context has been injected
62    pub fn has_parent(&self) -> bool {
63        self.hub.get().is_some()
64    }
65
66    /// Get a reference to the parent context
67    ///
68    /// Returns None if inject_parent hasn't been called yet.
69    pub fn parent(&self) -> Option<&P> {
70        self.hub.get()
71    }
72
73    /// Resolve a claudecode handle to its message content
74    ///
75    /// Called by the macro-generated resolve_handle method.
76    /// Handle format: {plugin_id}@1.0.0::chat:msg-{uuid}:{role}:{name}
77    pub async fn resolve_handle_impl(
78        &self,
79        handle: &crate::types::Handle,
80    ) -> Result<crate::plexus::PlexusStream, crate::plexus::PlexusError> {
81        use crate::plexus::{PlexusError, wrap_stream};
82        use async_stream::stream;
83
84        let storage = self.storage.clone();
85
86        // Join meta parts into colon-separated identifier
87        // Format: "msg-{uuid}:{role}:{name}"
88        if handle.meta.is_empty() {
89            return Err(PlexusError::ExecutionError(
90                "ClaudeCode handle missing message ID in meta".to_string()
91            ));
92        }
93        let identifier = handle.meta.join(":");
94
95        // Extract name from meta if present (for response)
96        let name = handle.meta.get(2).cloned();
97
98        let result_stream = stream! {
99            match storage.resolve_message_handle(&identifier).await {
100                Ok(message) => {
101                    yield ResolveResult::Message {
102                        id: message.id.to_string(),
103                        role: message.role.as_str().to_string(),
104                        content: message.content,
105                        model: message.model_id,
106                        name: name.unwrap_or_else(|| message.role.as_str().to_string()),
107                    };
108                }
109                Err(e) => {
110                    yield ResolveResult::Error {
111                        message: format!("Failed to resolve handle: {}", e.message),
112                    };
113                }
114            }
115        };
116
117        Ok(wrap_stream(result_stream, "claudecode.resolve_handle", vec!["claudecode".into()]))
118    }
119}
120
121/// Convenience constructors for ClaudeCode with NoParent (standalone/testing)
122impl ClaudeCode<NoParent> {
123    pub fn new(storage: Arc<ClaudeCodeStorage>) -> Self {
124        Self::with_context_type(storage)
125    }
126
127    pub fn with_executor(storage: Arc<ClaudeCodeStorage>, executor: ClaudeCodeExecutor) -> Self {
128        Self::with_executor_and_context(storage, executor)
129    }
130}
131
132// ═══════════════════════════════════════════════════════════════════════════
133// ARBOR EVENT CAPTURE HELPERS (Milestone 2)
134// ═══════════════════════════════════════════════════════════════════════════
135
136/// Create an arbor text node for a chat event
137async fn create_event_node(
138    arbor: &crate::activations::arbor::ArborStorage,
139    tree_id: &crate::activations::arbor::TreeId,
140    parent_id: &crate::activations::arbor::NodeId,
141    event: &NodeEvent,
142) -> Result<crate::activations::arbor::NodeId, String> {
143    let json = serde_json::to_string(event)
144        .map_err(|e| format!("Failed to serialize event: {}", e))?;
145
146    arbor.node_create_text(tree_id, Some(*parent_id), json, None)
147        .await
148        .map_err(|e| e.to_string())
149}
150
151#[hub_methods(
152    namespace = "claudecode",
153    version = "1.0.0",
154    description = "Manage Claude Code sessions with Arbor-backed conversation history",
155    resolve_handle
156)]
157impl<P: HubContext> ClaudeCode<P> {
158    /// Create a new Claude Code session
159    #[plexus_macros::hub_method(params(
160        name = "Human-readable name for the session",
161        working_dir = "Working directory for Claude Code",
162        model = "Model to use (opus, sonnet, haiku)",
163        system_prompt = "Optional system prompt / instructions",
164        loopback_enabled = "Enable loopback mode - routes tool permissions through parent for approval"
165    ))]
166    pub async fn create(
167        &self,
168        name: String,
169        working_dir: String,
170        model: Model,
171        system_prompt: Option<String>,
172        loopback_enabled: Option<bool>,
173    ) -> impl Stream<Item = CreateResult> + Send + 'static {
174        let storage = self.storage.clone();
175        let loopback = loopback_enabled.unwrap_or(false);
176
177        stream! {
178            match storage.session_create(name, working_dir, model, system_prompt, None, loopback, None).await {
179                Ok(config) => {
180                    yield CreateResult::Ok {
181                        id: config.id,
182                        head: config.head,
183                    };
184                }
185                Err(e) => {
186                    yield CreateResult::Err { message: e.to_string() };
187                }
188            }
189        }
190    }
191
192    /// Chat with a session, streaming tokens like Cone
193    #[plexus_macros::hub_method(
194        streaming,
195        params(
196            name = "Session name to chat with",
197            prompt = "User message / prompt to send",
198            ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
199        )
200    )]
201    pub async fn chat(
202        &self,
203        name: String,
204        prompt: String,
205        ephemeral: Option<bool>,
206    ) -> impl Stream<Item = ChatEvent> + Send + 'static {
207        let storage = self.storage.clone();
208        let executor = self.executor.clone();
209
210        // Resolve before entering stream to avoid lifetime issues
211        let resolve_result = storage.session_get_by_name(&name).await;
212
213        stream! {
214            let is_ephemeral = ephemeral.unwrap_or(false);
215
216            // 1. Resolve and load session
217            let config = match resolve_result {
218                Ok(c) => c,
219                Err(e) => {
220                    yield ChatEvent::Err { message: e.to_string() };
221                    return;
222                }
223            };
224
225            let session_id = config.id;
226
227            // 2. Store user message in our database (ephemeral if requested)
228            let user_msg = if is_ephemeral {
229                match storage.message_create_ephemeral(
230                    &session_id,
231                    MessageRole::User,
232                    prompt.clone(),
233                    None, None, None, None,
234                ).await {
235                    Ok(m) => m,
236                    Err(e) => {
237                        yield ChatEvent::Err { message: e.to_string() };
238                        return;
239                    }
240                }
241            } else {
242                match storage.message_create(
243                    &session_id,
244                    MessageRole::User,
245                    prompt.clone(),
246                    None, None, None, None,
247                ).await {
248                    Ok(m) => m,
249                    Err(e) => {
250                        yield ChatEvent::Err { message: e.to_string() };
251                        return;
252                    }
253                }
254            };
255
256            // 3. Create user message node in Arbor (ephemeral if requested)
257            let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
258            let user_node_id = if is_ephemeral {
259                match storage.arbor().node_create_external_ephemeral(
260                    &config.head.tree_id,
261                    Some(config.head.node_id),
262                    user_handle,
263                    None,
264                ).await {
265                    Ok(id) => id,
266                    Err(e) => {
267                        yield ChatEvent::Err { message: e.to_string() };
268                        return;
269                    }
270                }
271            } else {
272                match storage.arbor().node_create_external(
273                    &config.head.tree_id,
274                    Some(config.head.node_id),
275                    user_handle,
276                    None,
277                ).await {
278                    Ok(id) => id,
279                    Err(e) => {
280                        yield ChatEvent::Err { message: e.to_string() };
281                        return;
282                    }
283                }
284            };
285
286            let user_position = Position::new(config.head.tree_id, user_node_id);
287
288            // Track current parent for event node chain (Milestone 2)
289            let mut current_parent = user_node_id;
290
291            // Create UserMessage event node (Milestone 2)
292            let user_event = NodeEvent::UserMessage { content: prompt.clone() };
293            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &user_event).await {
294                current_parent = node_id;
295            }
296
297            // Create AssistantStart event node (Milestone 2)
298            let start_event = NodeEvent::AssistantStart;
299            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &start_event).await {
300                current_parent = node_id;
301            }
302
303            // 4. Emit Start
304            yield ChatEvent::Start {
305                id: session_id,
306                user_position,
307            };
308
309            // 5. Build launch config
310            let launch_config = LaunchConfig {
311                query: prompt,
312                session_id: config.claude_session_id.clone(),
313                fork_session: false,
314                model: config.model,
315                working_dir: config.working_dir.clone(),
316                system_prompt: config.system_prompt.clone(),
317                mcp_config: config.mcp_config.clone(),
318                loopback_enabled: config.loopback_enabled,
319                loopback_session_id: if config.loopback_enabled {
320                    Some(session_id.to_string())
321                } else {
322                    None
323                },
324                ..Default::default()
325            };
326
327            // 6. Launch Claude and stream events
328            let mut response_content = String::new();
329            let mut claude_session_id = config.claude_session_id.clone();
330            let mut cost_usd = None;
331            let mut num_turns = None;
332
333            let mut raw_stream = executor.launch(launch_config).await;
334
335            // Track current tool use for streaming tool input
336            let mut current_tool_id: Option<String> = None;
337            let mut current_tool_name: Option<String> = None;
338            let mut current_tool_input = String::new();
339
340            while let Some(event) = raw_stream.next().await {
341                match event {
342                    RawClaudeEvent::System { session_id: sid, .. } => {
343                        if let Some(id) = sid {
344                            claude_session_id = Some(id);
345                        }
346                    }
347                    RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
348                        if let Some(id) = sid {
349                            claude_session_id = Some(id);
350                        }
351                        match inner {
352                            StreamEventInner::ContentBlockDelta { delta, .. } => {
353                                match delta {
354                                    StreamDelta::TextDelta { text } => {
355                                        response_content.push_str(&text);
356
357                                        // Create arbor node for text content (Milestone 2)
358                                        let event = NodeEvent::ContentText { text: text.clone() };
359                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
360                                            current_parent = node_id;
361                                        }
362
363                                        yield ChatEvent::Content { text };
364                                    }
365                                    StreamDelta::InputJsonDelta { partial_json } => {
366                                        current_tool_input.push_str(&partial_json);
367                                    }
368                                }
369                            }
370                            StreamEventInner::ContentBlockStart { content_block, .. } => {
371                                if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
372                                    current_tool_id = Some(id);
373                                    current_tool_name = Some(name);
374                                    current_tool_input.clear();
375                                }
376                            }
377                            StreamEventInner::ContentBlockStop { .. } => {
378                                // Emit tool use if we were building one
379                                if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
380                                    let input: Value = serde_json::from_str(&current_tool_input)
381                                        .unwrap_or(Value::Object(serde_json::Map::new()));
382
383                                    // Create arbor node for tool use (Milestone 2)
384                                    let event = NodeEvent::ContentToolUse {
385                                        id: id.clone(),
386                                        name: name.clone(),
387                                        input: input.clone(),
388                                    };
389                                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
390                                        current_parent = node_id;
391                                    }
392
393                                    yield ChatEvent::ToolUse {
394                                        tool_name: name,
395                                        tool_use_id: id,
396                                        input,
397                                    };
398                                    current_tool_input.clear();
399                                }
400                            }
401                            _ => {}
402                        }
403                    }
404                    RawClaudeEvent::Assistant { message } => {
405                        // Still handle non-streaming assistant messages (tool results, etc.)
406                        if let Some(msg) = message {
407                            if let Some(content) = msg.content {
408                                for block in content {
409                                    match block {
410                                        RawContentBlock::Text { text } => {
411                                            // Only emit if we haven't already streamed this
412                                            if response_content.is_empty() {
413                                                response_content.push_str(&text);
414
415                                                // Create arbor node for text content (Milestone 2)
416                                                let event = NodeEvent::ContentText { text: text.clone() };
417                                                if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
418                                                    current_parent = node_id;
419                                                }
420
421                                                yield ChatEvent::Content { text };
422                                            }
423                                        }
424                                        RawContentBlock::ToolUse { id, name, input } => {
425                                            // Create arbor node for tool use (Milestone 2)
426                                            let event = NodeEvent::ContentToolUse {
427                                                id: id.clone(),
428                                                name: name.clone(),
429                                                input: input.clone(),
430                                            };
431                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
432                                                current_parent = node_id;
433                                            }
434
435                                            yield ChatEvent::ToolUse {
436                                                tool_name: name,
437                                                tool_use_id: id,
438                                                input,
439                                            };
440                                        }
441                                        RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
442                                            // Create arbor node for tool result (Milestone 2)
443                                            let event = NodeEvent::UserToolResult {
444                                                tool_use_id: tool_use_id.clone(),
445                                                content: content.clone().unwrap_or_default(),
446                                                is_error: is_error.unwrap_or(false),
447                                            };
448                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
449                                                current_parent = node_id;
450                                            }
451
452                                            yield ChatEvent::ToolResult {
453                                                tool_use_id,
454                                                output: content.unwrap_or_default(),
455                                                is_error: is_error.unwrap_or(false),
456                                            };
457                                        }
458                                        RawContentBlock::Thinking { thinking, .. } => {
459                                            // Create arbor node for thinking (Milestone 2)
460                                            let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
461                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
462                                                current_parent = node_id;
463                                            }
464
465                                            yield ChatEvent::Thinking { thinking };
466                                        }
467                                    }
468                                }
469                            }
470                        }
471                    }
472                    RawClaudeEvent::Result {
473                        session_id: sid,
474                        cost_usd: cost,
475                        num_turns: turns,
476                        is_error,
477                        error,
478                        ..
479                    } => {
480                        if let Some(id) = sid {
481                            claude_session_id = Some(id);
482                        }
483                        cost_usd = cost;
484                        num_turns = turns;
485
486                        // Check for error
487                        if is_error == Some(true) {
488                            if let Some(err_msg) = error {
489                                yield ChatEvent::Err { message: err_msg };
490                                return;
491                            }
492                        }
493                    }
494                    RawClaudeEvent::Unknown { event_type, data } => {
495                        // Store unknown event and get handle
496                        match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
497                            Ok(handle) => {
498                                tracing::debug!(event_type = %event_type, handle = %handle, "Unknown Claude event stored");
499                                yield ChatEvent::Passthrough { event_type, handle, data };
500                            }
501                            Err(e) => {
502                                tracing::warn!(event_type = %event_type, error = %e, "Failed to store unknown event");
503                                // Still forward the event even if storage fails
504                                yield ChatEvent::Passthrough {
505                                    event_type,
506                                    handle: "storage-failed".to_string(),
507                                    data,
508                                };
509                            }
510                        }
511                    }
512                    RawClaudeEvent::User { .. } => {
513                        // User events are echoed back but we don't need to process them
514                    }
515                }
516            }
517
518            // 7. Store assistant response (ephemeral if requested)
519            let model_id = format!("claude-code-{}", config.model.as_str());
520            let assistant_msg = if is_ephemeral {
521                match storage.message_create_ephemeral(
522                    &session_id,
523                    MessageRole::Assistant,
524                    response_content,
525                    Some(model_id),
526                    None,
527                    None,
528                    cost_usd,
529                ).await {
530                    Ok(m) => m,
531                    Err(e) => {
532                        yield ChatEvent::Err { message: e.to_string() };
533                        return;
534                    }
535                }
536            } else {
537                match storage.message_create(
538                    &session_id,
539                    MessageRole::Assistant,
540                    response_content,
541                    Some(model_id),
542                    None,
543                    None,
544                    cost_usd,
545                ).await {
546                    Ok(m) => m,
547                    Err(e) => {
548                        yield ChatEvent::Err { message: e.to_string() };
549                        return;
550                    }
551                }
552            };
553
554            // Create AssistantComplete event node (Milestone 2)
555            let complete_event = NodeEvent::AssistantComplete { usage: None };
556            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &complete_event).await {
557                current_parent = node_id;
558            }
559
560            // 8. Create assistant node in Arbor (ephemeral if requested)
561            let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
562            let assistant_node_id = if is_ephemeral {
563                match storage.arbor().node_create_external_ephemeral(
564                    &config.head.tree_id,
565                    Some(current_parent),
566                    assistant_handle,
567                    None,
568                ).await {
569                    Ok(id) => id,
570                    Err(e) => {
571                        yield ChatEvent::Err { message: e.to_string() };
572                        return;
573                    }
574                }
575            } else {
576                match storage.arbor().node_create_external(
577                    &config.head.tree_id,
578                    Some(current_parent),
579                    assistant_handle,
580                    None,
581                ).await {
582                    Ok(id) => id,
583                    Err(e) => {
584                        yield ChatEvent::Err { message: e.to_string() };
585                        return;
586                    }
587                }
588            };
589
590            let new_head = Position::new(config.head.tree_id, assistant_node_id);
591
592            // 9. Update session head and Claude session ID (skip for ephemeral)
593            if !is_ephemeral {
594                if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
595                    yield ChatEvent::Err { message: e.to_string() };
596                    return;
597                }
598            }
599
600            // 10. Emit Complete
601            // For ephemeral, new_head points to the ephemeral node (not the session's actual head)
602            yield ChatEvent::Complete {
603                new_head: if is_ephemeral { config.head } else { new_head },
604                claude_session_id: claude_session_id.unwrap_or_default(),
605                usage: Some(ChatUsage {
606                    input_tokens: None,
607                    output_tokens: None,
608                    cost_usd,
609                    num_turns,
610                }),
611            };
612        }
613    }
614
615    /// Get session configuration details
616    #[plexus_macros::hub_method]
617    async fn get(&self, name: String) -> impl Stream<Item = GetResult> + Send + 'static {
618        let result = self.storage.session_get_by_name(&name).await;
619
620        stream! {
621            match result {
622                Ok(config) => {
623                    yield GetResult::Ok { config };
624                }
625                Err(e) => {
626                    yield GetResult::Err { message: e.to_string() };
627                }
628            }
629        }
630    }
631
632    /// List all Claude Code sessions
633    #[plexus_macros::hub_method]
634    async fn list(&self) -> impl Stream<Item = ListResult> + Send + 'static {
635        let storage = self.storage.clone();
636
637        stream! {
638            match storage.session_list().await {
639                Ok(sessions) => {
640                    yield ListResult::Ok { sessions };
641                }
642                Err(e) => {
643                    yield ListResult::Err { message: e.to_string() };
644                }
645            }
646        }
647    }
648
649    /// Delete a session
650    #[plexus_macros::hub_method]
651    async fn delete(&self, name: String) -> impl Stream<Item = DeleteResult> + Send + 'static {
652        let storage = self.storage.clone();
653        let resolve_result = storage.session_get_by_name(&name).await;
654
655        stream! {
656            let config = match resolve_result {
657                Ok(c) => c,
658                Err(e) => {
659                    yield DeleteResult::Err { message: e.to_string() };
660                    return;
661                }
662            };
663
664            match storage.session_delete(&config.id).await {
665                Ok(_) => {
666                    yield DeleteResult::Ok { id: config.id };
667                }
668                Err(e) => {
669                    yield DeleteResult::Err { message: e.to_string() };
670                }
671            }
672        }
673    }
674
675    /// Fork a session to create a branch point
676    #[plexus_macros::hub_method]
677    async fn fork(
678        &self,
679        name: String,
680        new_name: String,
681    ) -> impl Stream<Item = ForkResult> + Send + 'static {
682        let storage = self.storage.clone();
683        let resolve_result = storage.session_get_by_name(&name).await;
684
685        stream! {
686            // Get parent session
687            let parent = match resolve_result {
688                Ok(c) => c,
689                Err(e) => {
690                    yield ForkResult::Err { message: e.to_string() };
691                    return;
692                }
693            };
694
695            // Create new session starting at parent's head position
696            // The new session will fork Claude's session on first chat
697            let new_config = match storage.session_create(
698                new_name,
699                parent.working_dir.clone(),
700                parent.model,
701                parent.system_prompt.clone(),
702                parent.mcp_config.clone(),
703                parent.loopback_enabled,
704                None,
705            ).await {
706                Ok(mut c) => {
707                    // Update head to parent's position (share the same tree point)
708                    // This creates a branch - the new session diverges from here
709                    if let Err(e) = storage.session_update_head(&c.id, parent.head.node_id, None).await {
710                        yield ForkResult::Err { message: e.to_string() };
711                        return;
712                    }
713                    c.head = parent.head;
714                    c
715                }
716                Err(e) => {
717                    yield ForkResult::Err { message: e.to_string() };
718                    return;
719                }
720            };
721
722            yield ForkResult::Ok {
723                id: new_config.id,
724                head: new_config.head,
725            };
726        }
727    }
728
729    /// Start an async chat - returns immediately with stream_id for polling
730    ///
731    /// This is the non-blocking version of chat, designed for loopback scenarios
732    /// where the parent needs to poll for events and handle tool approvals.
733    #[plexus_macros::hub_method(
734        params(
735            name = "Session name to chat with",
736            prompt = "User message / prompt to send",
737            ephemeral = "If true, creates nodes but doesn't advance head and marks for deletion"
738        )
739    )]
740    async fn chat_async(
741        &self,
742        name: String,
743        prompt: String,
744        ephemeral: Option<bool>,
745    ) -> impl Stream<Item = ChatStartResult> + Send + 'static {
746        let storage = self.storage.clone();
747        let executor = self.executor.clone();
748
749        // Resolve session before entering stream
750        let resolve_result = storage.session_get_by_name(&name).await;
751
752        stream! {
753            let is_ephemeral = ephemeral.unwrap_or(false);
754
755            // 1. Resolve session
756            let config = match resolve_result {
757                Ok(c) => c,
758                Err(e) => {
759                    yield ChatStartResult::Err { message: e.to_string() };
760                    return;
761                }
762            };
763
764            let session_id = config.id;
765
766            // 2. Create stream buffer
767            let stream_id = match storage.stream_create(session_id).await {
768                Ok(id) => id,
769                Err(e) => {
770                    yield ChatStartResult::Err { message: e.to_string() };
771                    return;
772                }
773            };
774
775            // 3. Spawn background task to run the chat
776            let storage_bg = storage.clone();
777            let executor_bg = executor.clone();
778            let prompt_bg = prompt.clone();
779            let config_bg = config.clone();
780            let stream_id_bg = stream_id;
781
782            tokio::spawn(async move {
783                Self::run_chat_background(
784                    storage_bg,
785                    executor_bg,
786                    config_bg,
787                    prompt_bg,
788                    is_ephemeral,
789                    stream_id_bg,
790                ).await;
791            }.instrument(tracing::info_span!("chat_async_bg", stream_id = %stream_id)));
792
793            // 4. Return immediately with stream_id
794            yield ChatStartResult::Ok {
795                stream_id,
796                session_id,
797            };
798        }
799    }
800
801    /// Poll a stream for new events
802    ///
803    /// Returns events since the last poll (or from the specified offset).
804    /// Use this to read events from an async chat started with chat_async.
805    #[plexus_macros::hub_method(
806        params(
807            stream_id = "Stream ID returned from chat_async",
808            from_seq = "Optional: start reading from this sequence number",
809            limit = "Optional: max events to return (default 100)"
810        )
811    )]
812    async fn poll(
813        &self,
814        stream_id: StreamId,
815        from_seq: Option<u64>,
816        limit: Option<u64>,
817    ) -> impl Stream<Item = PollResult> + Send + 'static {
818        let storage = self.storage.clone();
819
820        stream! {
821            let limit_usize = limit.map(|l| l as usize);
822
823            match storage.stream_poll(&stream_id, from_seq, limit_usize).await {
824                Ok((info, events)) => {
825                    let has_more = info.read_position < info.event_count;
826                    yield PollResult::Ok {
827                        status: info.status,
828                        events,
829                        read_position: info.read_position,
830                        total_events: info.event_count,
831                        has_more,
832                    };
833                }
834                Err(e) => {
835                    yield PollResult::Err { message: e.to_string() };
836                }
837            }
838        }
839    }
840
841    /// List active streams
842    ///
843    /// Returns all active streams, optionally filtered by session.
844    #[plexus_macros::hub_method(
845        params(
846            session_id = "Optional: filter by session ID"
847        )
848    )]
849    async fn streams(
850        &self,
851        session_id: Option<ClaudeCodeId>,
852    ) -> impl Stream<Item = StreamListResult> + Send + 'static {
853        let storage = self.storage.clone();
854
855        stream! {
856            let streams = if let Some(sid) = session_id {
857                storage.stream_list_for_session(&sid).await
858            } else {
859                storage.stream_list().await
860            };
861
862            yield StreamListResult::Ok { streams };
863        }
864    }
865
866    /// Get arbor tree information for a session
867    #[plexus_macros::hub_method(params(
868        name = "Session name"
869    ))]
870    async fn get_tree(
871        &self,
872        name: String,
873    ) -> impl Stream<Item = GetTreeResult> + Send + 'static {
874        let storage = self.storage.clone();
875
876        stream! {
877            let config = match storage.session_get_by_name(&name).await {
878                Ok(c) => c,
879                Err(e) => {
880                    yield GetTreeResult::Err { message: e.to_string() };
881                    return;
882                }
883            };
884
885            yield GetTreeResult::Ok {
886                tree_id: config.head.tree_id,
887                head: config.head.node_id,
888            };
889        }
890    }
891
892    /// Render arbor tree as Claude API messages
893    #[plexus_macros::hub_method(params(
894        name = "Session name",
895        start = "Optional start node (default: root)",
896        end = "Optional end node (default: head)"
897    ))]
898    async fn render_context(
899        &self,
900        name: String,
901        start: Option<NodeId>,
902        end: Option<NodeId>,
903    ) -> impl Stream<Item = RenderResult> + Send + 'static {
904        let storage = self.storage.clone();
905
906        stream! {
907            // Get session config
908            let config = match storage.session_get_by_name(&name).await {
909                Ok(c) => c,
910                Err(e) => {
911                    yield RenderResult::Err { message: e.to_string() };
912                    return;
913                }
914            };
915
916            let tree_id = config.head.tree_id;
917            let end_node = end.unwrap_or(config.head.node_id);
918
919            // Get root if start not specified
920            let start_node = if let Some(s) = start {
921                s
922            } else {
923                match storage.arbor().tree_get(&tree_id).await {
924                    Ok(tree) => tree.root,
925                    Err(e) => {
926                        yield RenderResult::Err { message: e.to_string() };
927                        return;
928                    }
929                }
930            };
931
932            // Render messages
933            let messages = match storage.render_messages(&tree_id, &start_node, &end_node).await {
934                Ok(m) => m,
935                Err(e) => {
936                    yield RenderResult::Err { message: e.to_string() };
937                    return;
938                }
939            };
940
941            yield RenderResult::Ok { messages };
942        }
943    }
944
945    /// List all session files for a project
946    #[plexus_macros::hub_method(params(
947        project_path = "Project path (e.g., '-workspace-hypermemetic-hub-codegen')"
948    ))]
949    async fn sessions_list(
950        &self,
951        project_path: String,
952    ) -> impl Stream<Item = SessionsListResult> + Send + 'static {
953        stream! {
954            match sessions::list_sessions(&project_path).await {
955                Ok(sessions) => {
956                    yield SessionsListResult::Ok { sessions };
957                }
958                Err(e) => {
959                    yield SessionsListResult::Err { message: e };
960                }
961            }
962        }
963    }
964
965    /// Get events from a session file
966    #[plexus_macros::hub_method(params(
967        project_path = "Project path",
968        session_id = "Session ID (UUID)"
969    ))]
970    async fn sessions_get(
971        &self,
972        project_path: String,
973        session_id: String,
974    ) -> impl Stream<Item = SessionsGetResult> + Send + 'static {
975        stream! {
976            match sessions::read_session(&project_path, &session_id).await {
977                Ok(events) => {
978                    let event_count = events.len();
979                    // Convert to JSON values for transport
980                    let events_json: Vec<serde_json::Value> = events.into_iter()
981                        .filter_map(|e| serde_json::to_value(e).ok())
982                        .collect();
983
984                    yield SessionsGetResult::Ok {
985                        session_id: session_id.clone(),
986                        event_count,
987                        events: events_json,
988                    };
989                }
990                Err(e) => {
991                    yield SessionsGetResult::Err { message: e };
992                }
993            }
994        }
995    }
996
997    /// Import a session file into arbor
998    #[plexus_macros::hub_method(params(
999        project_path = "Project path",
1000        session_id = "Session ID to import",
1001        owner_id = "Owner ID for the new tree (default: 'claudecode')"
1002    ))]
1003    async fn sessions_import(
1004        &self,
1005        project_path: String,
1006        session_id: String,
1007        owner_id: Option<String>,
1008    ) -> impl Stream<Item = SessionsImportResult> + Send + 'static {
1009        let storage = self.storage.clone();
1010
1011        stream! {
1012            let owner = owner_id.unwrap_or_else(|| "claudecode".to_string());
1013
1014            match sessions::import_to_arbor(storage.arbor(), &project_path, &session_id, &owner).await {
1015                Ok(tree_id) => {
1016                    yield SessionsImportResult::Ok {
1017                        tree_id,
1018                        session_id,
1019                    };
1020                }
1021                Err(e) => {
1022                    yield SessionsImportResult::Err { message: e };
1023                }
1024            }
1025        }
1026    }
1027
1028    /// Export an arbor tree to a session file
1029    #[plexus_macros::hub_method(params(
1030        tree_id = "Arbor tree ID to export",
1031        project_path = "Project path",
1032        session_id = "Session ID for the exported file"
1033    ))]
1034    async fn sessions_export(
1035        &self,
1036        tree_id: TreeId,
1037        project_path: String,
1038        session_id: String,
1039    ) -> impl Stream<Item = SessionsExportResult> + Send + 'static {
1040        let storage = self.storage.clone();
1041
1042        stream! {
1043            match sessions::export_from_arbor(storage.arbor(), &tree_id, &project_path, &session_id).await {
1044                Ok(()) => {
1045                    yield SessionsExportResult::Ok {
1046                        tree_id,
1047                        session_id,
1048                    };
1049                }
1050                Err(e) => {
1051                    yield SessionsExportResult::Err { message: e };
1052                }
1053            }
1054        }
1055    }
1056
1057    /// Delete a session file
1058    #[plexus_macros::hub_method(params(
1059        project_path = "Project path",
1060        session_id = "Session ID to delete"
1061    ))]
1062    async fn sessions_delete(
1063        &self,
1064        project_path: String,
1065        session_id: String,
1066    ) -> impl Stream<Item = SessionsDeleteResult> + Send + 'static {
1067        stream! {
1068            match sessions::delete_session(&project_path, &session_id).await {
1069                Ok(()) => {
1070                    yield SessionsDeleteResult::Ok {
1071                        session_id,
1072                        deleted: true,
1073                    };
1074                }
1075                Err(e) => {
1076                    yield SessionsDeleteResult::Err { message: e };
1077                }
1078            }
1079        }
1080    }
1081}
1082
1083// Background task implementation (outside the hub_methods block)
1084impl<P: HubContext> ClaudeCode<P> {
1085    /// Run chat in background, pushing events to stream buffer
1086    async fn run_chat_background(
1087        storage: Arc<ClaudeCodeStorage>,
1088        executor: ClaudeCodeExecutor,
1089        config: ClaudeCodeConfig,
1090        prompt: String,
1091        is_ephemeral: bool,
1092        stream_id: StreamId,
1093    ) {
1094        let session_id = config.id;
1095
1096        // 1. Store user message
1097        let user_msg = if is_ephemeral {
1098            match storage.message_create_ephemeral(
1099                &session_id,
1100                MessageRole::User,
1101                prompt.clone(),
1102                None, None, None, None,
1103            ).await {
1104                Ok(m) => m,
1105                Err(e) => {
1106                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1107                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1108                    return;
1109                }
1110            }
1111        } else {
1112            match storage.message_create(
1113                &session_id,
1114                MessageRole::User,
1115                prompt.clone(),
1116                None, None, None, None,
1117            ).await {
1118                Ok(m) => m,
1119                Err(e) => {
1120                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1121                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1122                    return;
1123                }
1124            }
1125        };
1126
1127        // 2. Create user node in Arbor
1128        let user_handle = ClaudeCodeStorage::message_to_handle(&user_msg, "user");
1129        let user_node_id = if is_ephemeral {
1130            match storage.arbor().node_create_external_ephemeral(
1131                &config.head.tree_id,
1132                Some(config.head.node_id),
1133                user_handle,
1134                None,
1135            ).await {
1136                Ok(id) => id,
1137                Err(e) => {
1138                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1139                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1140                    return;
1141                }
1142            }
1143        } else {
1144            match storage.arbor().node_create_external(
1145                &config.head.tree_id,
1146                Some(config.head.node_id),
1147                user_handle,
1148                None,
1149            ).await {
1150                Ok(id) => id,
1151                Err(e) => {
1152                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1153                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1154                    return;
1155                }
1156            }
1157        };
1158
1159        let user_position = Position::new(config.head.tree_id, user_node_id);
1160
1161        // Track current parent for event node chain (Milestone 2)
1162        let mut current_parent = user_node_id;
1163
1164        // Create UserMessage event node (Milestone 2)
1165        let user_event = NodeEvent::UserMessage { content: prompt.clone() };
1166        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &user_event).await {
1167            current_parent = node_id;
1168        }
1169
1170        // Create AssistantStart event node (Milestone 2)
1171        let start_event = NodeEvent::AssistantStart;
1172        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &start_event).await {
1173            current_parent = node_id;
1174        }
1175
1176        // Update stream with user position
1177        let _ = storage.stream_set_user_position(&stream_id, user_position).await;
1178
1179        // 3. Push Start event
1180        let _ = storage.stream_push_event(&stream_id, ChatEvent::Start {
1181            id: session_id,
1182            user_position,
1183        }).await;
1184
1185        // 4. Build launch config
1186        let launch_config = LaunchConfig {
1187            query: prompt,
1188            session_id: config.claude_session_id.clone(),
1189            fork_session: false,
1190            model: config.model,
1191            working_dir: config.working_dir.clone(),
1192            system_prompt: config.system_prompt.clone(),
1193            mcp_config: config.mcp_config.clone(),
1194            loopback_enabled: config.loopback_enabled,
1195            loopback_session_id: if config.loopback_enabled {
1196                Some(session_id.to_string())
1197            } else {
1198                None
1199            },
1200            ..Default::default()
1201        };
1202
1203        // 5. Launch Claude and stream events to buffer
1204        let mut response_content = String::new();
1205        let mut claude_session_id = config.claude_session_id.clone();
1206        let mut cost_usd = None;
1207        let mut num_turns = None;
1208
1209        let mut raw_stream = executor.launch(launch_config).await;
1210
1211        // Track current tool use for streaming
1212        let mut current_tool_id: Option<String> = None;
1213        let mut current_tool_name: Option<String> = None;
1214        let mut current_tool_input = String::new();
1215
1216        while let Some(event) = raw_stream.next().await {
1217            match event {
1218                RawClaudeEvent::System { session_id: sid, .. } => {
1219                    if let Some(id) = sid {
1220                        claude_session_id = Some(id);
1221                    }
1222                }
1223                RawClaudeEvent::StreamEvent { event: inner, session_id: sid } => {
1224                    if let Some(id) = sid {
1225                        claude_session_id = Some(id);
1226                    }
1227                    match inner {
1228                        StreamEventInner::ContentBlockDelta { delta, .. } => {
1229                            match delta {
1230                                StreamDelta::TextDelta { text } => {
1231                                    response_content.push_str(&text);
1232
1233                                    // Create arbor node for text content (Milestone 2)
1234                                    let event = NodeEvent::ContentText { text: text.clone() };
1235                                    if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1236                                        current_parent = node_id;
1237                                    }
1238
1239                                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
1240                                }
1241                                StreamDelta::InputJsonDelta { partial_json } => {
1242                                    current_tool_input.push_str(&partial_json);
1243                                }
1244                            }
1245                        }
1246                        StreamEventInner::ContentBlockStart { content_block, .. } => {
1247                            if let Some(StreamContentBlock::ToolUse { id, name, .. }) = content_block {
1248                                current_tool_id = Some(id);
1249                                current_tool_name = Some(name);
1250                                current_tool_input.clear();
1251                            }
1252                        }
1253                        StreamEventInner::ContentBlockStop { .. } => {
1254                            if let (Some(id), Some(name)) = (current_tool_id.take(), current_tool_name.take()) {
1255                                let input: Value = serde_json::from_str(&current_tool_input)
1256                                    .unwrap_or(Value::Object(serde_json::Map::new()));
1257
1258                                // Check if this is a loopback_permit call (tool waiting for approval)
1259                                if name == "mcp__plexus__loopback_permit" {
1260                                    let _ = storage.stream_set_status(&stream_id, StreamStatus::AwaitingPermission, None).await;
1261                                }
1262
1263                                // Create arbor node for tool use (Milestone 2)
1264                                let event = NodeEvent::ContentToolUse {
1265                                    id: id.clone(),
1266                                    name: name.clone(),
1267                                    input: input.clone(),
1268                                };
1269                                if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1270                                    current_parent = node_id;
1271                                }
1272
1273                                let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1274                                    tool_name: name,
1275                                    tool_use_id: id,
1276                                    input,
1277                                }).await;
1278                                current_tool_input.clear();
1279                            }
1280                        }
1281                        StreamEventInner::MessageDelta { delta } => {
1282                            // If stop_reason is tool_use with loopback, mark as awaiting
1283                            if delta.stop_reason == Some("tool_use".to_string()) {
1284                                // Check if we're in loopback mode (already marked above)
1285                            }
1286                        }
1287                        _ => {}
1288                    }
1289                }
1290                RawClaudeEvent::Assistant { message } => {
1291                    if let Some(msg) = message {
1292                        if let Some(content) = msg.content {
1293                            for block in content {
1294                                match block {
1295                                    RawContentBlock::Text { text } => {
1296                                        if response_content.is_empty() {
1297                                            response_content.push_str(&text);
1298
1299                                            // Create arbor node for text content (Milestone 2)
1300                                            let event = NodeEvent::ContentText { text: text.clone() };
1301                                            if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1302                                                current_parent = node_id;
1303                                            }
1304
1305                                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Content { text }).await;
1306                                        }
1307                                    }
1308                                    RawContentBlock::ToolUse { id, name, input } => {
1309                                        // Create arbor node for tool use (Milestone 2)
1310                                        let event = NodeEvent::ContentToolUse {
1311                                            id: id.clone(),
1312                                            name: name.clone(),
1313                                            input: input.clone(),
1314                                        };
1315                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1316                                            current_parent = node_id;
1317                                        }
1318
1319                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolUse {
1320                                            tool_name: name,
1321                                            tool_use_id: id,
1322                                            input,
1323                                        }).await;
1324                                    }
1325                                    RawContentBlock::ToolResult { tool_use_id, content, is_error } => {
1326                                        // Tool completed - back to running if was awaiting
1327                                        let _ = storage.stream_set_status(&stream_id, StreamStatus::Running, None).await;
1328
1329                                        // Create arbor node for tool result (Milestone 2)
1330                                        let event = NodeEvent::UserToolResult {
1331                                            tool_use_id: tool_use_id.clone(),
1332                                            content: content.clone().unwrap_or_default(),
1333                                            is_error: is_error.unwrap_or(false),
1334                                        };
1335                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1336                                            current_parent = node_id;
1337                                        }
1338
1339                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::ToolResult {
1340                                            tool_use_id,
1341                                            output: content.unwrap_or_default(),
1342                                            is_error: is_error.unwrap_or(false),
1343                                        }).await;
1344                                    }
1345                                    RawContentBlock::Thinking { thinking, .. } => {
1346                                        // Create arbor node for thinking (Milestone 2)
1347                                        let event = NodeEvent::ContentThinking { thinking: thinking.clone() };
1348                                        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &event).await {
1349                                            current_parent = node_id;
1350                                        }
1351
1352                                        let _ = storage.stream_push_event(&stream_id, ChatEvent::Thinking { thinking }).await;
1353                                    }
1354                                }
1355                            }
1356                        }
1357                    }
1358                }
1359                RawClaudeEvent::Result {
1360                    session_id: sid,
1361                    cost_usd: cost,
1362                    num_turns: turns,
1363                    is_error,
1364                    error,
1365                    ..
1366                } => {
1367                    if let Some(id) = sid {
1368                        claude_session_id = Some(id);
1369                    }
1370                    cost_usd = cost;
1371                    num_turns = turns;
1372
1373                    if is_error == Some(true) {
1374                        if let Some(err_msg) = error {
1375                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: err_msg.clone() }).await;
1376                            let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(err_msg)).await;
1377                            return;
1378                        }
1379                    }
1380                }
1381                RawClaudeEvent::Unknown { event_type, data } => {
1382                    match storage.unknown_event_store(Some(&session_id), &event_type, &data).await {
1383                        Ok(handle) => {
1384                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough { event_type, handle, data }).await;
1385                        }
1386                        Err(_) => {
1387                            let _ = storage.stream_push_event(&stream_id, ChatEvent::Passthrough {
1388                                event_type,
1389                                handle: "storage-failed".to_string(),
1390                                data,
1391                            }).await;
1392                        }
1393                    }
1394                }
1395                RawClaudeEvent::User { .. } => {}
1396            }
1397        }
1398
1399        // 6. Store assistant response
1400        let model_id = format!("claude-code-{}", config.model.as_str());
1401        let assistant_msg = if is_ephemeral {
1402            match storage.message_create_ephemeral(
1403                &session_id,
1404                MessageRole::Assistant,
1405                response_content,
1406                Some(model_id),
1407                None,
1408                None,
1409                cost_usd,
1410            ).await {
1411                Ok(m) => m,
1412                Err(e) => {
1413                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1414                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1415                    return;
1416                }
1417            }
1418        } else {
1419            match storage.message_create(
1420                &session_id,
1421                MessageRole::Assistant,
1422                response_content,
1423                Some(model_id),
1424                None,
1425                None,
1426                cost_usd,
1427            ).await {
1428                Ok(m) => m,
1429                Err(e) => {
1430                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1431                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1432                    return;
1433                }
1434            }
1435        };
1436
1437        // Create AssistantComplete event node (Milestone 2)
1438        let complete_event = NodeEvent::AssistantComplete { usage: None };
1439        if let Ok(node_id) = create_event_node(storage.arbor(), &config.head.tree_id, &current_parent, &complete_event).await {
1440            current_parent = node_id;
1441        }
1442
1443        // 7. Create assistant node in Arbor
1444        let assistant_handle = ClaudeCodeStorage::message_to_handle(&assistant_msg, "assistant");
1445        let assistant_node_id = if is_ephemeral {
1446            match storage.arbor().node_create_external_ephemeral(
1447                &config.head.tree_id,
1448                Some(current_parent),
1449                assistant_handle,
1450                None,
1451            ).await {
1452                Ok(id) => id,
1453                Err(e) => {
1454                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1455                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1456                    return;
1457                }
1458            }
1459        } else {
1460            match storage.arbor().node_create_external(
1461                &config.head.tree_id,
1462                Some(current_parent),
1463                assistant_handle,
1464                None,
1465            ).await {
1466                Ok(id) => id,
1467                Err(e) => {
1468                    let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1469                    let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1470                    return;
1471                }
1472            }
1473        };
1474
1475        let new_head = Position::new(config.head.tree_id, assistant_node_id);
1476
1477        // 8. Update session head (skip for ephemeral)
1478        if !is_ephemeral {
1479            if let Err(e) = storage.session_update_head(&session_id, assistant_node_id, claude_session_id.clone()).await {
1480                let _ = storage.stream_push_event(&stream_id, ChatEvent::Err { message: e.to_string() }).await;
1481                let _ = storage.stream_set_status(&stream_id, StreamStatus::Failed, Some(e.to_string())).await;
1482                return;
1483            }
1484        }
1485
1486        // 9. Push Complete event and mark stream as complete
1487        let _ = storage.stream_push_event(&stream_id, ChatEvent::Complete {
1488            new_head: if is_ephemeral { config.head } else { new_head },
1489            claude_session_id: claude_session_id.unwrap_or_default(),
1490            usage: Some(ChatUsage {
1491                input_tokens: None,
1492                output_tokens: None,
1493                cost_usd,
1494                num_turns,
1495            }),
1496        }).await;
1497
1498        let _ = storage.stream_set_status(&stream_id, StreamStatus::Complete, None).await;
1499    }
1500}