Skip to main content

plexus_substrate/activations/orcha/
activation.rs

1use super::graph_runner;
2use super::graph_runtime::GraphRuntime;
3use super::orchestrator::run_orchestration_task;
4use super::pm;
5use super::storage::OrchaStorage;
6use super::ticket_compiler;
7use super::types::*;
8use crate::activations::claudecode::{ClaudeCode, Model};
9use crate::activations::claudecode_loopback::ClaudeCodeLoopback;
10use crate::plexus::{Activation, ChildRouter, ChildSummary, HubContext, NoParent, PlexusError, PlexusStream};
11use async_stream::stream;
12use async_trait::async_trait;
13use futures::Stream;
14use futures::StreamExt;
15use plexus_macros::activation;
16use serde_json::Value;
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use tokio::process::Command;
21use uuid::Uuid;
22
23/// Registry of cancellation senders keyed by graph_id.
24///
25/// When `cancel_graph` is called, the sender's value is set to `true`, which all
26/// running node tasks observe via their cloned `Receiver<bool>` and exit early.
27type CancelRegistry = Arc<tokio::sync::Mutex<HashMap<String, tokio::sync::watch::Sender<bool>>>>;
28
29/// Orcha activation - Full orchestration with approval loops and validation
30///
31/// Provides both full orchestration (run_task) and coordination helpers.
32#[derive(Clone)]
33pub struct Orcha<P: HubContext = NoParent> {
34    storage: Arc<OrchaStorage>,
35    claudecode: Arc<ClaudeCode<P>>,
36    loopback: Arc<ClaudeCodeLoopback>,
37    arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
38    graph_runtime: Arc<GraphRuntime>,
39    pm: Arc<pm::Pm>,
40    /// Cancellation registry: graph_id → watch sender (true = cancel).
41    cancel_registry: CancelRegistry,
42    _phantom: PhantomData<P>,
43}
44
45impl<P: HubContext> Orcha<P> {
46    /// Create a new Orcha activation
47    pub fn new(
48        storage: Arc<OrchaStorage>,
49        claudecode: Arc<ClaudeCode<P>>,
50        loopback: Arc<ClaudeCodeLoopback>,
51        arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
52        graph_runtime: Arc<GraphRuntime>,
53        pm: Arc<pm::Pm>,
54    ) -> Self {
55        Self {
56            storage,
57            claudecode,
58            loopback,
59            arbor_storage,
60            graph_runtime,
61            pm,
62            cancel_registry: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
63            _phantom: PhantomData,
64        }
65    }
66
67    /// Register a new cancellation token for a graph, returning the receiver.
68    ///
69    /// If a token already exists for this graph_id, it is replaced.
70    async fn register_cancel(&self, graph_id: &str) -> tokio::sync::watch::Receiver<bool> {
71        let (tx, rx) = tokio::sync::watch::channel(false);
72        self.cancel_registry.lock().await.insert(graph_id.to_string(), tx);
73        rx
74    }
75
76    /// Remove the cancellation token for a graph (called on normal completion/failure).
77    #[allow(dead_code)]
78    async fn unregister_cancel(&self, graph_id: &str) {
79        self.cancel_registry.lock().await.remove(graph_id);
80    }
81
82    /// Child plugin summaries — pm is the only subactivation.
83    pub fn plugin_children(&self) -> Vec<ChildSummary> {
84        let schema = Activation::plugin_schema(&*self.pm);
85        vec![ChildSummary {
86            namespace: schema.namespace,
87            description: schema.description,
88            hash: schema.hash,
89        }]
90    }
91
92    /// Best-effort startup recovery for graphs that were running when the substrate
93    /// last shut down.
94    ///
95    /// For each graph that is tracked by PM and still has `status = 'running'` in the
96    /// lattice DB:
97    ///   1. Any node stuck in `running` is marked failed with
98    ///      "interrupted: substrate restarted" so the lattice can propagate the error.
99    ///   2. Any node stuck in `ready` has a fresh NodeReady event emitted so the new
100    ///      watcher can pick it up.
101    ///   3. A new `run_graph_execution` task is spawned for the graph, reconnecting
102    ///      Orcha's dispatch logic to the live event stream.
103    ///
104    /// This is fire-and-forget: errors are logged and skipped, never propagated.
105    pub async fn recover_running_graphs(&self)
106    where
107        P: 'static,
108    {
109        use crate::activations::lattice::NodeStatus;
110        use futures::StreamExt;
111
112        let lattice_storage = self.graph_runtime.storage();
113
114        // Find all graph IDs known to PM (started via run_tickets / build_tickets).
115        let pm_graph_ids = match self.pm.list_all_graph_ids().await {
116            Ok(ids) => ids,
117            Err(e) => {
118                tracing::warn!("recovery: failed to list PM graph IDs: {}", e);
119                return;
120            }
121        };
122
123        // Find which of those are currently 'running' in the lattice.
124        let running_ids = match lattice_storage.get_running_graph_ids().await {
125            Ok(ids) => ids,
126            Err(e) => {
127                tracing::warn!("recovery: failed to query running graphs: {}", e);
128                return;
129            }
130        };
131
132        // Intersect: only recover graphs known to PM.
133        let pm_set: std::collections::HashSet<String> = pm_graph_ids.into_iter().collect();
134        let to_recover: Vec<String> = running_ids
135            .into_iter()
136            .filter(|id| pm_set.contains(id))
137            .collect();
138
139        if to_recover.is_empty() {
140            tracing::debug!("recovery: no running PM graphs to recover");
141            return;
142        }
143
144        tracing::info!("recovery: recovering {} running graph(s)", to_recover.len());
145
146        for graph_id in to_recover {
147            let storage = lattice_storage.clone();
148            let graph_id_clone = graph_id.clone();
149
150            // Step 1: inspect nodes and fix up state.
151            let nodes = match storage.get_nodes(&graph_id).await {
152                Ok(n) => n,
153                Err(e) => {
154                    tracing::warn!("recovery: get_nodes({}) failed: {}", graph_id, e);
155                    continue;
156                }
157            };
158
159            for node in &nodes {
160                match node.status {
161                    NodeStatus::Running => {
162                        tracing::info!(
163                            "recovery: re-dispatching interrupted node {} in graph {}",
164                            node.id, graph_id
165                        );
166                        if let Err(e) = storage.reset_running_to_ready(&graph_id, &node.id).await {
167                            tracing::warn!(
168                                "recovery: reset_running_to_ready failed for node {} in {}: {}",
169                                node.id, graph_id, e
170                            );
171                        }
172                    }
173                    NodeStatus::Ready => {
174                        // Re-emit a NodeReady event so the fresh watcher dispatches it.
175                        tracing::info!(
176                            "recovery: re-emitting NodeReady for node {} in graph {}",
177                            node.id, graph_id
178                        );
179                        if let Err(e) = storage.reemit_ready_nodes(&graph_id).await {
180                            tracing::warn!(
181                                "recovery: reemit_ready_nodes({}) failed: {}",
182                                graph_id, e
183                            );
184                        }
185                        // reemit handles all ready nodes at once; break out of per-node loop.
186                        break;
187                    }
188                    _ => {} // Pending / Complete / Failed — no action needed.
189                }
190            }
191
192            // Step 2: spawn run_graph_execution to re-attach the dispatch watcher.
193            let graph = Arc::new(self.graph_runtime.open_graph(graph_id_clone.clone()));
194            let cc = self.claudecode.clone();
195            let arbor = self.arbor_storage.clone();
196            let lb = self.loopback.storage();
197            let cancel_registry = self.cancel_registry.clone();
198            let pm_for_recovery = self.pm.clone();
199            let graph_runtime_recovery = self.graph_runtime.clone();
200
201            // Load persisted run config from graph metadata.
202            let graph_meta = lattice_storage.get_graph(&graph_id_clone).await.ok()
203                .and_then(|g| g.metadata.get("_plexus_run_config").cloned());
204
205            let model_enum = graph_meta.as_ref()
206                .and_then(|c| c.get("model"))
207                .and_then(|m| m.as_str())
208                .map(|s| match s {
209                    "opus" => crate::activations::claudecode::Model::Opus,
210                    "haiku" => crate::activations::claudecode::Model::Haiku,
211                    _ => crate::activations::claudecode::Model::Sonnet,
212                })
213                .unwrap_or(crate::activations::claudecode::Model::Sonnet);
214
215            let working_directory = graph_meta.as_ref()
216                .and_then(|c| c.get("working_directory"))
217                .and_then(|w| w.as_str())
218                .unwrap_or("/workspace")
219                .to_string();
220
221            // Load persisted ticket map and invert to node_id → ticket_id.
222            let node_to_ticket: std::collections::HashMap<String, String> = pm_for_recovery
223                .get_ticket_map(&graph_id_clone)
224                .await
225                .unwrap_or_default()
226                .into_iter()
227                .map(|(ticket_id, node_id)| (node_id, ticket_id))
228                .collect();
229
230            tokio::spawn(async move {
231                tracing::info!("recovery: spawning run_graph_execution for {}", graph_id_clone);
232                // Register a cancel token so this recovered graph can be cancelled.
233                let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
234                cancel_registry.lock().await.insert(graph_id_clone.clone(), cancel_tx);
235
236                let execution = graph_runner::run_graph_execution(
237                    graph,
238                    cc,
239                    arbor,
240                    lb,
241                    pm_for_recovery,
242                    graph_runtime_recovery,
243                    cancel_registry.clone(),
244                    model_enum,
245                    working_directory,
246                    cancel_rx,
247                    node_to_ticket,
248                );
249                tokio::pin!(execution);
250                while let Some(_event) = execution.next().await {}
251                cancel_registry.lock().await.remove(&graph_id_clone);
252                tracing::info!("recovery: graph {} execution complete", graph_id_clone);
253            });
254        }
255    }
256}
257
258/// Internal helper: watch one graph's lattice events and forward them as OrchaEvents into `tx`.
259///
260/// Used by `watch_graph_tree` to multiplex root + all child graphs into one channel.
261async fn watch_single_graph(
262    gid: String,
263    after_seq: Option<u64>,
264    graph_runtime: Arc<GraphRuntime>,
265    pm: Arc<pm::Pm>,
266    tx: tokio::sync::mpsc::UnboundedSender<OrchaEvent>,
267) {
268    let graph = graph_runtime.open_graph(gid.clone());
269    let node_to_ticket: HashMap<String, String> = pm
270        .get_ticket_map(&gid)
271        .await
272        .unwrap_or_default()
273        .into_iter()
274        .map(|(ticket_id, node_id)| (node_id, ticket_id))
275        .collect();
276
277    let total_nodes = graph.count_nodes().await.unwrap_or(0);
278    let mut complete_nodes: usize = 0;
279
280    fn calc_pct(complete: usize, total: usize) -> Option<u32> {
281        if total == 0 {
282            None
283        } else {
284            Some((complete as f32 / total as f32 * 100.0) as u32)
285        }
286    }
287
288    let event_stream = graph.watch(after_seq);
289    tokio::pin!(event_stream);
290
291    while let Some(crate::activations::lattice::LatticeEventEnvelope { event, .. }) =
292        event_stream.next().await
293    {
294        let evt = match event {
295            crate::activations::lattice::LatticeEvent::NodeReady { node_id, .. } => {
296                let ticket_id = node_to_ticket.get(&node_id).cloned();
297                Some(OrchaEvent::NodeStarted {
298                    node_id,
299                    label: None,
300                    ticket_id,
301                    percentage: calc_pct(complete_nodes, total_nodes),
302                })
303            }
304            crate::activations::lattice::LatticeEvent::NodeStarted { .. } => None,
305            crate::activations::lattice::LatticeEvent::NodeDone { node_id, .. } => {
306                complete_nodes += 1;
307                let ticket_id = node_to_ticket.get(&node_id).cloned();
308                Some(OrchaEvent::NodeComplete {
309                    node_id,
310                    label: None,
311                    ticket_id,
312                    output_summary: None,
313                    percentage: calc_pct(complete_nodes, total_nodes),
314                })
315            }
316            crate::activations::lattice::LatticeEvent::NodeFailed { node_id, error } => {
317                complete_nodes += 1;
318                let ticket_id = node_to_ticket.get(&node_id).cloned();
319                Some(OrchaEvent::NodeFailed {
320                    node_id,
321                    label: None,
322                    ticket_id,
323                    error,
324                    percentage: calc_pct(complete_nodes, total_nodes),
325                })
326            }
327            crate::activations::lattice::LatticeEvent::GraphDone { graph_id } => {
328                Some(OrchaEvent::Complete { session_id: graph_id })
329            }
330            crate::activations::lattice::LatticeEvent::GraphFailed {
331                graph_id,
332                node_id,
333                error,
334            } => Some(OrchaEvent::Failed {
335                session_id: graph_id,
336                error: format!("Node {} failed: {}", node_id, error),
337            }),
338        };
339        if let Some(e) = evt {
340            if tx.send(e).is_err() {
341                break;
342            }
343        }
344    }
345}
346
347#[async_trait]
348impl<P: HubContext + 'static> ChildRouter for Orcha<P> {
349    fn router_namespace(&self) -> &str {
350        "orcha"
351    }
352
353    async fn router_call(&self, method: &str, params: Value, auth: Option<&plexus_core::plexus::AuthContext>, raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
354            Activation::call(self, method, params, auth, raw_ctx).await
355        }
356
357    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
358        if name == "pm" {
359            Some(Box::new((*self.pm).clone()))
360        } else {
361            None
362        }
363    }
364}
365
366#[plexus_macros::activation(namespace = "orcha",
367version = "1.0.0",
368description = "Full task orchestration with approval loops and validation",
369hub, crate_path = "plexus_core")]
370impl<P: HubContext> Orcha<P> {
371    /// Run a complete orchestration task
372    ///
373    /// This is the main entry point for running tasks with the full orcha pattern:
374    /// - Creates sessions
375    /// - Runs task with approval handling
376    /// - Extracts and executes validation
377    /// - Auto-retries on failure
378    #[plexus_macros::method]
379    async fn run_task(
380        &self,
381        request: RunTaskRequest,
382    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
383        run_orchestration_task(
384            self.storage.clone(),
385            self.arbor_storage.clone(),
386            self.claudecode.clone(),
387            self.loopback.clone(),
388            request,
389            None, // Let orchestrator generate session_id
390        ).await
391    }
392    /// Create a new orchestration session
393    ///
394    /// Creates a session record to track orchestration state. The client should
395    /// then create a corresponding claudecode session with loopback enabled.
396    #[plexus_macros::method]
397    async fn create_session(
398        &self,
399        request: CreateSessionRequest,
400    ) -> impl Stream<Item = CreateSessionResult> + Send + 'static {
401        let storage = self.storage.clone();
402
403        stream! {
404            // Generate unique session ID
405            let session_id = format!("orcha-{}", Uuid::new_v4());
406
407            // Determine agent mode
408            let agent_mode = if request.multi_agent {
409                AgentMode::Multi
410            } else {
411                AgentMode::Single
412            };
413
414            // Create session in storage
415            let session_result = storage.create_session(
416                session_id.clone(),
417                request.model.clone(),
418                request.working_directory.clone(),
419                request.rules.clone(),
420                request.max_retries,
421                agent_mode,
422                None, // tree_id (created by run_orchestration_task)
423            ).await;
424
425            match session_result {
426                Ok(session) => {
427                    yield CreateSessionResult::Ok {
428                        session_id,
429                        created_at: session.created_at,
430                    };
431                }
432                Err(e) => {
433                    yield CreateSessionResult::Err {
434                        message: format!("Failed to create session: {}", e),
435                    };
436                }
437            }
438        }
439    }
440
441    /// Update session state
442    ///
443    /// Called by the client to update the current state of the session
444    #[plexus_macros::method]
445    async fn update_session_state(
446        &self,
447        session_id: SessionId,
448        state: SessionState,
449    ) -> impl Stream<Item = UpdateSessionStateResult> + Send + 'static {
450        let storage = self.storage.clone();
451
452        stream! {
453            match storage.update_state(&session_id, state).await {
454                Ok(_) => {
455                    yield UpdateSessionStateResult::Ok;
456                }
457                Err(e) => {
458                    yield UpdateSessionStateResult::Err {
459                        message: format!("Failed to update state: {}", e),
460                    };
461                }
462            }
463        }
464    }
465
466    /// Get session information
467    #[plexus_macros::method]
468    async fn get_session(
469        &self,
470        request: GetSessionRequest,
471    ) -> impl Stream<Item = GetSessionResult> + Send + 'static {
472        let storage = self.storage.clone();
473
474        stream! {
475            match storage.get_session(&request.session_id).await {
476                Ok(session) => {
477                    yield GetSessionResult::Ok { session };
478                }
479                Err(e) => {
480                    yield GetSessionResult::Err {
481                        message: format!("Session not found: {}", e),
482                    };
483                }
484            }
485        }
486    }
487
488    /// Extract validation artifact from text
489    ///
490    /// Scans text for {"orcha_validate": {...}} pattern and extracts test command
491    #[plexus_macros::method]
492    async fn extract_validation(
493        &self,
494        text: String,
495    ) -> impl Stream<Item = ExtractValidationResult> + Send + 'static {
496        stream! {
497            match extract_validation_artifact(&text) {
498                Some(artifact) => {
499                    yield ExtractValidationResult::Ok { artifact };
500                }
501                None => {
502                    yield ExtractValidationResult::NotFound;
503                }
504            }
505        }
506    }
507
508    /// Run a validation test
509    ///
510    /// Executes a test command and returns the result
511    #[plexus_macros::method]
512    async fn run_validation(
513        &self,
514        artifact: ValidationArtifact,
515    ) -> impl Stream<Item = RunValidationResult> + Send + 'static {
516        stream! {
517            let result = run_validation_test(&artifact).await;
518
519            yield RunValidationResult::Ok { result };
520        }
521    }
522
523    /// Increment retry counter for a session
524    ///
525    /// Called when validation fails and the client wants to retry
526    #[plexus_macros::method]
527    async fn increment_retry(
528        &self,
529        session_id: SessionId,
530    ) -> impl Stream<Item = IncrementRetryResult> + Send + 'static {
531        let storage = self.storage.clone();
532
533        stream! {
534            match storage.increment_retry(&session_id).await {
535                Ok(count) => {
536                    let max_retries = match storage.get_session(&session_id).await {
537                        Ok(s) => s.max_retries,
538                        Err(e) => {
539                            tracing::warn!("Failed to get session {} for max_retries lookup: {}", session_id, e);
540                            3
541                        }
542                    };
543
544                    yield IncrementRetryResult::Ok {
545                        retry_count: count,
546                        max_retries,
547                        exceeded: count >= max_retries,
548                    };
549                }
550                Err(e) => {
551                    yield IncrementRetryResult::Err {
552                        message: format!("Failed to increment retry: {}", e),
553                    };
554                }
555            }
556        }
557    }
558
559    /// List all sessions
560    #[plexus_macros::method]
561    async fn list_sessions(&self) -> impl Stream<Item = ListSessionsResult> + Send + 'static {
562        let storage = self.storage.clone();
563
564        stream! {
565            let sessions = storage.list_sessions().await;
566            yield ListSessionsResult::Ok { sessions };
567        }
568    }
569
570    /// Delete a session
571    #[plexus_macros::method]
572    async fn delete_session(
573        &self,
574        session_id: SessionId,
575    ) -> impl Stream<Item = DeleteSessionResult> + Send + 'static {
576        let storage = self.storage.clone();
577
578        stream! {
579            match storage.delete_session(&session_id).await {
580                Ok(_) => {
581                    yield DeleteSessionResult::Ok;
582                }
583                Err(e) => {
584                    yield DeleteSessionResult::Err {
585                        message: format!("Failed to delete session: {}", e),
586                    };
587                }
588            }
589        }
590    }
591
592    /// Run a task asynchronously - returns immediately with session_id
593    ///
594    /// Like run_task but non-blocking. Returns the session_id immediately
595    /// and the task runs in the background. Use check_status or get_session
596    /// to check on progress.
597    #[plexus_macros::method]
598    async fn run_task_async(
599        &self,
600        request: RunTaskRequest,
601    ) -> impl Stream<Item = RunTaskAsyncResult> + Send + 'static {
602        let storage = self.storage.clone();
603        let arbor_storage = self.arbor_storage.clone();
604        let claudecode = self.claudecode.clone();
605        let loopback = self.loopback.clone();
606
607        stream! {
608            // Generate session ID that will be used by the orchestrator
609            let session_id = format!("orcha-{}", Uuid::new_v4());
610            let session_id_for_spawn = session_id.clone();
611
612            // Spawn the orchestration task in the background
613            let req = request.clone();
614            tokio::spawn(async move {
615                let stream = run_orchestration_task(
616                    storage,
617                    arbor_storage,
618                    claudecode,
619                    loopback,
620                    req,
621                    Some(session_id_for_spawn), // Pass the session_id to orchestrator
622                ).await;
623                tokio::pin!(stream);
624
625                // Consume the stream in the background
626                while let Some(_event) = stream.next().await {
627                    // Events are discarded in async mode
628                    // Use get_session or check_status to monitor
629                }
630            });
631
632            // Return immediately with session_id
633            yield RunTaskAsyncResult::Ok { session_id };
634        }
635    }
636
637    /// List all orcha monitor trees
638    ///
639    /// Returns all arbor trees created by orcha for monitoring sessions
640    #[plexus_macros::method]
641    async fn list_monitor_trees(
642        &self,
643    ) -> impl Stream<Item = ListMonitorTreesResult> + Send + 'static {
644        let arbor_storage = self.arbor_storage.clone();
645
646        stream! {
647            // Query arbor for trees with metadata type="orcha_monitor"
648            let filter = serde_json::json!({"type": "orcha_monitor"});
649
650            match arbor_storage.tree_query_by_metadata(&filter).await {
651                Ok(tree_ids) => {
652                    let mut trees = Vec::new();
653
654                    // Get metadata for each tree
655                    for tree_id in tree_ids {
656                        if let Ok(tree) = arbor_storage.tree_get(&tree_id).await {
657                            if let Some(metadata) = &tree.metadata {
658                                let session_id = metadata.get("session_id")
659                                    .and_then(|v| v.as_str())
660                                    .unwrap_or("unknown")
661                                    .to_string();
662                                let tree_path = metadata.get("tree_path")
663                                    .and_then(|v| v.as_str())
664                                    .unwrap_or("unknown")
665                                    .to_string();
666
667                                trees.push(MonitorTreeInfo {
668                                    tree_id: tree.id.to_string(),
669                                    session_id,
670                                    tree_path,
671                                });
672                            }
673                        }
674                    }
675
676                    yield ListMonitorTreesResult::Ok { trees };
677                }
678                Err(_) => {
679                    yield ListMonitorTreesResult::Ok { trees: vec![] };
680                }
681            }
682        }
683    }
684
685    /// Check status of a running session by asking Claude to summarize
686    ///
687    /// Creates an ephemeral forked session to generate a summary of what's happening,
688    /// and saves the summary to an arbor monitoring tree for historical tracking.
689    #[plexus_macros::method]
690    async fn check_status(
691        &self,
692        request: CheckStatusRequest,
693    ) -> impl Stream<Item = CheckStatusResult> + Send + 'static {
694        let claudecode = self.claudecode.clone();
695        let arbor_storage = self.arbor_storage.clone();
696        let storage = self.storage.clone();
697        let session_id = request.session_id.clone();
698
699        stream! {
700            // First, get the actual session state from storage
701            let session_info = match storage.get_session(&session_id).await {
702                Ok(info) => info,
703                Err(e) => {
704                    yield CheckStatusResult::Err {
705                        message: format!("Session not found: {}", e),
706                    };
707                    return;
708                }
709            };
710
711            // Branch based on agent mode
712            if session_info.agent_mode == AgentMode::Multi {
713                // Multi-agent status check
714                let agents = match storage.list_agents(&session_id).await {
715                    Ok(a) => a,
716                    Err(e) => {
717                        yield CheckStatusResult::Err {
718                            message: format!("Failed to list agents: {}", e),
719                        };
720                        return;
721                    }
722                };
723
724                if agents.is_empty() {
725                    yield CheckStatusResult::Err {
726                        message: "No agents found in session".to_string(),
727                    };
728                    return;
729                }
730
731                // Generate summaries for each agent in parallel
732                let summary_futures: Vec<_> = agents.iter().map(|agent| {
733                    generate_agent_summary(&claudecode, &arbor_storage, agent.clone())
734                }).collect();
735
736                let agent_summaries: Vec<AgentSummary> = futures::future::join_all(summary_futures)
737                    .await
738                    .into_iter()
739                    .filter_map(|r| match r {
740                        Ok(summary) => Some(summary),
741                        Err(e) => {
742                            tracing::warn!("Failed to generate agent summary: {}", e);
743                            None
744                        }
745                    })
746                    .collect();
747
748                // Generate overall meta-summary
749                let overall_summary = generate_overall_summary(
750                    &claudecode,
751                    &session_id,
752                    &agent_summaries,
753                ).await;
754
755                let summary = overall_summary.unwrap_or_else(|| "Unable to generate summary".to_string());
756
757                // Save to arbor monitoring tree
758                match save_status_summary_to_arbor(&arbor_storage, &session_id, &summary).await {
759                    Ok(_) => {
760                        yield CheckStatusResult::Ok {
761                            summary,
762                            agent_summaries,
763                        };
764                    }
765                    Err(e) => {
766                        tracing::warn!("Failed to save summary to arbor: {}", e);
767                        yield CheckStatusResult::Ok {
768                            summary,
769                            agent_summaries,
770                        };
771                    }
772                }
773
774                return;
775            }
776
777            // Single-agent mode (original logic below)
778
779
780            // Format session state as context for Claude and extract stream_id for arbor lookup
781            let (state_description, stream_id_opt) = match &session_info.state {
782                SessionState::Idle => ("idle (not currently executing)".to_string(), None),
783                SessionState::Running { stream_id, sequence, active_agents, completed_agents, failed_agents } => {
784                    let agent_info = if *active_agents > 0 || *completed_agents > 0 || *failed_agents > 0 {
785                        format!(" (agents: {} active, {} complete, {} failed)", active_agents, completed_agents, failed_agents)
786                    } else {
787                        String::new()
788                    };
789                    (format!("running (stream: {}, sequence: {}{})", stream_id, sequence, agent_info), Some(stream_id.clone()))
790                }
791                SessionState::WaitingApproval { approval_id } => {
792                    (format!("waiting for approval (approval_id: {})", approval_id), None)
793                }
794                SessionState::Validating { test_command } => {
795                    (format!("validating with command: {}", test_command), None)
796                }
797                SessionState::Complete => ("completed successfully".to_string(), None),
798                SessionState::Failed { error } => {
799                    (format!("failed with error: {}", error), None)
800                }
801            };
802
803            // Try to get the conversation tree from claudecode if we have a stream_id
804            let conversation_context = if let Some(stream_id) = stream_id_opt {
805                // Get the claudecode session to find its arbor tree
806                match claudecode.storage.session_get_by_name(&stream_id).await {
807                    Ok(cc_session) => {
808                        // Get and render the arbor tree as a formatted conversation
809                        match arbor_storage.tree_get(&cc_session.head.tree_id).await {
810                            Ok(tree) => {
811                                let formatted = format_conversation_from_tree(&tree);
812                                Some(formatted)
813                            }
814                            Err(e) => {
815                                tracing::warn!("Failed to get arbor tree for claudecode session {}: {}", stream_id, e);
816                                None
817                            }
818                        }
819                    }
820                    Err(e) => {
821                        tracing::warn!("Failed to get claudecode session {}: {}", stream_id, e);
822                        None
823                    }
824                }
825            } else {
826                None
827            };
828
829            // Create an ephemeral session to ask for a summary
830            let summary_session = format!("orcha-check-{}", Uuid::new_v4());
831            let summary_session_id = format!("{}-check-{}", session_id, Uuid::new_v4());
832
833            // Create the session - using Haiku for fast, cheap summaries
834            let create_stream = claudecode.create(
835                summary_session.clone(),
836                "/workspace".to_string(), // Default, doesn't matter for ephemeral
837                crate::activations::claudecode::Model::Haiku,
838                None,
839                Some(false), // No loopback needed for summary
840                Some(summary_session_id), // Track ephemeral session under parent
841            ).await;
842            tokio::pin!(create_stream);
843
844            let mut create_ok = false;
845            while let Some(result) = create_stream.next().await {
846                if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
847                    create_ok = true;
848                }
849            }
850
851            if !create_ok {
852                yield CheckStatusResult::Err {
853                    message: "Failed to create summary session".to_string(),
854                };
855                return;
856            }
857
858            // Ask Claude to summarize the session with actual context
859            let prompt = if let Some(conversation) = conversation_context {
860                format!(
861                    "An orcha orchestration session has the following status:\n\n\
862                     - Session ID: {}\n\
863                     - Model: {}\n\
864                     - State: {}\n\
865                     - Retry count: {}/{}\n\
866                     - Created at: {} (unix timestamp)\n\
867                     - Last activity: {} (unix timestamp)\n\n\
868                     Here is the actual conversation tree showing what the agent has been doing:\n\n\
869                     {}\n\n\
870                     Generate a brief, natural language summary (2-3 sentences) of what's happening in this session.\n\
871                     Focus on what the agent is currently doing or has accomplished. Be specific about the task.",
872                    session_id,
873                    session_info.model,
874                    state_description,
875                    session_info.retry_count,
876                    session_info.max_retries,
877                    session_info.created_at,
878                    session_info.last_activity,
879                    conversation
880                )
881            } else {
882                format!(
883                    "An orcha orchestration session has the following status:\n\n\
884                     - Session ID: {}\n\
885                     - Model: {}\n\
886                     - State: {}\n\
887                     - Retry count: {}/{}\n\
888                     - Created at: {} (unix timestamp)\n\
889                     - Last activity: {} (unix timestamp)\n\n\
890                     Generate a brief, natural language summary (2-3 sentences) of what's happening in this session.\n\
891                     Focus on the current state and what the agent is doing or has done.",
892                    session_id,
893                    session_info.model,
894                    state_description,
895                    session_info.retry_count,
896                    session_info.max_retries,
897                    session_info.created_at,
898                    session_info.last_activity
899                )
900            };
901
902            let chat_stream = claudecode.chat(
903                summary_session.clone(),
904                prompt,
905                Some(true), // Ephemeral - don't save to history
906                None,
907            ).await;
908            tokio::pin!(chat_stream);
909
910            let mut summary = String::new();
911            while let Some(event) = chat_stream.next().await {
912                if let crate::activations::claudecode::ChatEvent::Content { text } = event {
913                    summary.push_str(&text);
914                }
915            }
916
917            if summary.is_empty() {
918                yield CheckStatusResult::Err {
919                    message: "Failed to generate summary".to_string(),
920                };
921            } else {
922                // Save summary to arbor monitoring tree
923                match save_status_summary_to_arbor(&arbor_storage, &session_id, &summary).await {
924                    Ok(_) => {
925                        yield CheckStatusResult::Ok {
926                            summary,
927                            agent_summaries: vec![],  // Single-agent mode
928                        };
929                    }
930                    Err(e) => {
931                        // Still return the summary even if arbor save fails
932                        tracing::warn!("Failed to save summary to arbor: {}", e);
933                        yield CheckStatusResult::Ok {
934                            summary,
935                            agent_summaries: vec![],  // Single-agent mode
936                        };
937                    }
938                }
939            }
940        }
941    }
942
943    /// Spawn a new agent in an existing session (multi-agent mode)
944    ///
945    /// Creates a new ClaudeCode session and tracks it as an agent within the orcha session.
946    /// Can be called explicitly via API or by agents themselves requesting helpers.
947    #[plexus_macros::method]
948    async fn spawn_agent(
949        &self,
950        request: SpawnAgentRequest,
951    ) -> impl Stream<Item = SpawnAgentResult> + Send + 'static {
952        let storage = self.storage.clone();
953        let claudecode = self.claudecode.clone();
954        let loopback = self.loopback.clone();
955
956        stream! {
957            // Verify session exists and is in multi-agent mode
958            let session = match storage.get_session(&request.session_id).await {
959                Ok(s) => s,
960                Err(e) => {
961                    yield SpawnAgentResult::Err {
962                        message: format!("Session not found: {}", e),
963                    };
964                    return;
965                }
966            };
967
968            if session.agent_mode != AgentMode::Multi {
969                yield SpawnAgentResult::Err {
970                    message: "Session is not in multi-agent mode".to_string(),
971                };
972                return;
973            }
974
975            // Parse model
976            let model = match session.model.as_str() {
977                "opus" => crate::activations::claudecode::Model::Opus,
978                "sonnet" => crate::activations::claudecode::Model::Sonnet,
979                "haiku" => crate::activations::claudecode::Model::Haiku,
980                _ => crate::activations::claudecode::Model::Sonnet,
981            };
982
983            // Create ClaudeCode session for this agent
984            let cc_session_name = format!("orcha-agent-{}", Uuid::new_v4());
985            let agent_session_id = format!("{}-agent-{}", session.session_id, Uuid::new_v4());
986
987            let create_stream = claudecode.create(
988                cc_session_name.clone(),
989                "/workspace".to_string(),  // TODO: Get from session
990                model.clone(),
991                None,
992                Some(true), // Loopback enabled
993                Some(agent_session_id), // Track agent under parent session
994            ).await;
995            tokio::pin!(create_stream);
996
997            let mut create_ok = false;
998            while let Some(result) = create_stream.next().await {
999                if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
1000                    create_ok = true;
1001                    break;
1002                }
1003            }
1004
1005            if !create_ok {
1006                yield SpawnAgentResult::Err {
1007                    message: "Failed to create ClaudeCode session".to_string(),
1008                };
1009                return;
1010            }
1011
1012            // Create agent record
1013            match storage.create_agent(
1014                &request.session_id,
1015                cc_session_name.clone(),
1016                request.subtask.clone(),
1017                false, // Not primary
1018                request.parent_agent_id,
1019            ).await {
1020                Ok(agent) => {
1021                    // Spawn background task to run this agent
1022                    let config = super::orchestrator::AgentConfig {
1023                        model,
1024                        working_directory: "/workspace".to_string(),
1025                        max_retries: session.max_retries,
1026                        task_context: request.subtask.clone(),
1027                        auto_approve: true, // TODO: Store in session and retrieve
1028                    };
1029
1030                    super::orchestrator::spawn_agent_task(
1031                        storage.clone(),
1032                        claudecode.clone(),
1033                        loopback.clone(),
1034                        agent.clone(),
1035                        request.subtask.clone(),
1036                        config,
1037                    );
1038
1039                    yield SpawnAgentResult::Ok {
1040                        agent_id: agent.agent_id,
1041                        claudecode_session_id: cc_session_name,
1042                    };
1043                }
1044                Err(e) => {
1045                    yield SpawnAgentResult::Err {
1046                        message: format!("Failed to create agent: {}", e),
1047                    };
1048                }
1049            }
1050        }
1051    }
1052
1053    /// List all agents in a session
1054    #[plexus_macros::method]
1055    async fn list_agents(
1056        &self,
1057        request: ListAgentsRequest,
1058    ) -> impl Stream<Item = ListAgentsResult> + Send + 'static {
1059        let storage = self.storage.clone();
1060
1061        stream! {
1062            match storage.list_agents(&request.session_id).await {
1063                Ok(agents) => {
1064                    yield ListAgentsResult::Ok { agents };
1065                }
1066                Err(e) => {
1067                    yield ListAgentsResult::Err {
1068                        message: format!("Failed to list agents: {}", e),
1069                    };
1070                }
1071            }
1072        }
1073    }
1074
1075    /// Get specific agent info
1076    #[plexus_macros::method]
1077    async fn get_agent(
1078        &self,
1079        request: GetAgentRequest,
1080    ) -> impl Stream<Item = GetAgentResult> + Send + 'static {
1081        let storage = self.storage.clone();
1082
1083        stream! {
1084            match storage.get_agent(&request.agent_id).await {
1085                Ok(agent) => {
1086                    yield GetAgentResult::Ok { agent };
1087                }
1088                Err(e) => {
1089                    yield GetAgentResult::Err {
1090                        message: format!("Agent not found: {}", e),
1091                    };
1092                }
1093            }
1094        }
1095    }
1096
1097    /// List pending approval requests for a session
1098    ///
1099    /// Returns all approval requests awaiting manual approval.
1100    /// Only relevant when auto_approve is disabled.
1101    #[plexus_macros::method]
1102    async fn list_pending_approvals(
1103        &self,
1104        request: ListApprovalsRequest,
1105    ) -> impl Stream<Item = ListApprovalsResult> + Send + 'static {
1106        let loopback = self.loopback.clone();
1107        let session_id = request.session_id;
1108
1109        stream! {
1110            match loopback.storage().list_pending(Some(&session_id)).await {
1111                Ok(approvals) => {
1112                    let approval_infos: Vec<ApprovalInfo> = approvals
1113                        .into_iter()
1114                        .map(|approval| ApprovalInfo {
1115                            approval_id: approval.id.to_string(),
1116                            session_id: approval.session_id,
1117                            tool_name: approval.tool_name,
1118                            tool_use_id: approval.tool_use_id,
1119                            tool_input: approval.input,
1120                            created_at: chrono::DateTime::from_timestamp(approval.created_at, 0)
1121                                .map(|dt| dt.to_rfc3339())
1122                                .unwrap_or_else(|| approval.created_at.to_string()),
1123                        })
1124                        .collect();
1125
1126                    yield ListApprovalsResult::Ok {
1127                        approvals: approval_infos,
1128                    };
1129                }
1130                Err(e) => {
1131                    yield ListApprovalsResult::Err {
1132                        message: format!("Failed to list pending approvals: {}", e),
1133                    };
1134                }
1135            }
1136        }
1137    }
1138
1139    /// Approve a pending request
1140    ///
1141    /// Approves a tool use request and unblocks the waiting agent.
1142    /// The approval_id comes from list_pending_approvals.
1143    #[plexus_macros::method]
1144    async fn approve_request(
1145        &self,
1146        request: ApproveRequest,
1147    ) -> impl Stream<Item = ApprovalActionResult> + Send + 'static {
1148        let loopback = self.loopback.clone();
1149        let approval_id = request.approval_id.clone();
1150        let message = request.message.clone();
1151
1152        stream! {
1153            match uuid::Uuid::parse_str(&approval_id) {
1154                Ok(uuid_id) => {
1155                    match loopback.storage()
1156                        .resolve_approval(&uuid_id, true, message.clone())
1157                        .await
1158                    {
1159                        Ok(_) => {
1160                            yield ApprovalActionResult::Ok {
1161                                approval_id: approval_id.clone(),
1162                                message: Some("Approved".to_string()),
1163                            };
1164                        }
1165                        Err(e) => {
1166                            yield ApprovalActionResult::Err {
1167                                message: format!("Failed to approve: {}", e),
1168                            };
1169                        }
1170                    }
1171                }
1172                Err(_) => {
1173                    yield ApprovalActionResult::Err {
1174                        message: format!("Invalid approval_id format: {}", approval_id),
1175                    };
1176                }
1177            }
1178        }
1179    }
1180
1181    /// Deny a pending request
1182    ///
1183    /// Denies a tool use request. The agent will receive an error
1184    /// and may adapt or fail depending on its error handling.
1185    #[plexus_macros::method]
1186    async fn deny_request(
1187        &self,
1188        request: DenyRequest,
1189    ) -> impl Stream<Item = ApprovalActionResult> + Send + 'static {
1190        let loopback = self.loopback.clone();
1191        let approval_id = request.approval_id.clone();
1192        let reason = request.reason.clone();
1193
1194        stream! {
1195            match uuid::Uuid::parse_str(&approval_id) {
1196                Ok(uuid_id) => {
1197                    match loopback.storage()
1198                        .resolve_approval(&uuid_id, false, reason.clone())
1199                        .await
1200                    {
1201                        Ok(_) => {
1202                            yield ApprovalActionResult::Ok {
1203                                approval_id: approval_id.clone(),
1204                                message: reason.or(Some("Denied".to_string())),
1205                            };
1206                        }
1207                        Err(e) => {
1208                            yield ApprovalActionResult::Err {
1209                                message: format!("Failed to deny: {}", e),
1210                            };
1211                        }
1212                    }
1213                }
1214                Err(_) => {
1215                    yield ApprovalActionResult::Err {
1216                        message: format!("Invalid approval_id format: {}", approval_id),
1217                    };
1218                }
1219            }
1220        }
1221    }
1222
1223    /// Subscribe to pending approval requests for a graph — push stream for human-in-the-loop UIs.
1224    ///
1225    /// Unlike `list_pending_approvals` which is a snapshot, this method streams
1226    /// `ApprovalPending` events whenever a new approval arrives for the graph.
1227    /// Use this to drive a UI that shows "Claude wants to run Bash — approve?".
1228    ///
1229    /// The stream yields all currently-pending approvals immediately, then waits
1230    /// for new ones via a `Notify`-based wake-up. Closes after `timeout_secs`
1231    /// of silence (default: 300 seconds).
1232    #[plexus_macros::method(params(
1233        graph_id = "Graph ID to watch for approval requests",
1234        timeout_secs = "How long to wait before closing (default: 300)"
1235    ))]
1236    async fn subscribe_approvals(
1237        &self,
1238        graph_id: String,
1239        timeout_secs: Option<u64>,
1240    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1241        let loopback_storage = self.loopback.storage();
1242        let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(300));
1243
1244        stream! {
1245            let notifier = loopback_storage.get_or_create_notifier(&graph_id);
1246            let deadline = std::time::Instant::now() + timeout;
1247            let mut seen_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1248
1249            loop {
1250                // Yield all currently-pending approvals for this graph (skip already-sent ones)
1251                match loopback_storage.list_pending(Some(&graph_id)).await {
1252                    Ok(approvals) => {
1253                        for approval in approvals {
1254                            let id_str = approval.id.to_string();
1255                            if seen_ids.contains(&id_str) {
1256                                continue;
1257                            }
1258                            seen_ids.insert(id_str);
1259                            let created_at = chrono::DateTime::from_timestamp(approval.created_at, 0)
1260                                .map(|dt| dt.to_rfc3339())
1261                                .unwrap_or_else(|| approval.created_at.to_string());
1262                            yield OrchaEvent::ApprovalPending {
1263                                approval_id: approval.id.to_string(),
1264                                graph_id: graph_id.clone(),
1265                                tool_name: approval.tool_name,
1266                                tool_input: approval.input,
1267                                created_at,
1268                            };
1269                        }
1270                    }
1271                    Err(e) => {
1272                        tracing::warn!("subscribe_approvals: failed to list pending: {}", e);
1273                    }
1274                }
1275
1276                // Wait for a new approval notification or timeout
1277                let remaining = match deadline.checked_duration_since(std::time::Instant::now()) {
1278                    Some(d) => d,
1279                    None => break, // timeout expired
1280                };
1281
1282                tokio::select! {
1283                    _ = notifier.notified() => {
1284                        // New approval arrived — loop to re-list and yield
1285                        continue;
1286                    }
1287                    _ = tokio::time::sleep(remaining) => {
1288                        // Timeout reached — close stream
1289                        break;
1290                    }
1291                }
1292            }
1293        }
1294    }
1295
1296    /// Execute a lattice graph — dispatches nodes by type using Orcha's execution logic.
1297    ///
1298    /// Node types:
1299    /// - `"task"`: run a ClaudeCode session with `spec.data.task` as the prompt
1300    /// - `"synthesize"`: like task, with optional prior-work context from `spec.handle`
1301    /// - `"validate"`: run a shell command from `spec.data.command`
1302    ///
1303    /// Streams OrchaEvent progress events until the graph completes or fails.
1304    #[plexus_macros::method(params(
1305        graph_id = "ID of the lattice graph to execute",
1306        model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
1307        working_directory = "Working directory for task nodes (default: /workspace)"
1308    ))]
1309    async fn run_graph(
1310        &self,
1311        graph_id: String,
1312        model: Option<String>,
1313        working_directory: Option<String>,
1314    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1315        let model_enum = match model.as_deref().unwrap_or("sonnet") {
1316            "opus" => Model::Opus,
1317            "haiku" => Model::Haiku,
1318            _ => Model::Sonnet,
1319        };
1320        let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1321
1322        let cancel_rx = self.register_cancel(&graph_id).await;
1323        let cancel_registry = self.cancel_registry.clone();
1324        let graph = Arc::new(self.graph_runtime.open_graph(graph_id.clone()));
1325        let claudecode = self.claudecode.clone();
1326        let arbor_storage = self.arbor_storage.clone();
1327        let loopback_storage = self.loopback.storage();
1328        let pm = self.pm.clone();
1329        let graph_runtime = self.graph_runtime.clone();
1330        stream! {
1331            let execution = graph_runner::run_graph_execution(
1332                graph,
1333                claudecode,
1334                arbor_storage,
1335                loopback_storage,
1336                pm,
1337                graph_runtime,
1338                cancel_registry.clone(),
1339                model_enum,
1340                wd,
1341                cancel_rx,
1342                std::collections::HashMap::new(),
1343            );
1344            tokio::pin!(execution);
1345            while let Some(event) = execution.next().await {
1346                yield event;
1347            }
1348            cancel_registry.lock().await.remove(&graph_id);
1349        }
1350    }
1351
1352    /// Run a complete orchestration task driven by a single planning prompt.
1353    ///
1354    /// This is the single-call counterpart to the three-step sequence:
1355    /// `create_graph` → `add_plan_node` → `run_graph`.
1356    ///
1357    /// A Plan node is created that asks Claude to generate and execute a child
1358    /// graph from the supplied `task` description. Streams `OrchaEvent` progress
1359    /// events until the graph completes or fails.
1360    #[plexus_macros::method(params(
1361        task = "Natural-language task — passed directly to Claude as the planning prompt",
1362        model = "Model for all nodes: opus, sonnet, haiku (default: sonnet)",
1363        working_directory = "Working directory for task nodes (default: /workspace)"
1364    ))]
1365    async fn run_plan(
1366        &self,
1367        task: String,
1368        model: Option<String>,
1369        working_directory: Option<String>,
1370    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1371        let model_str = model.as_deref().unwrap_or("sonnet").to_string();
1372        let model_enum = match model_str.as_str() {
1373            "opus" => Model::Opus,
1374            "haiku" => Model::Haiku,
1375            _ => Model::Sonnet,
1376        };
1377        let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1378        let graph_runtime = self.graph_runtime.clone();
1379        let cancel_registry = self.cancel_registry.clone();
1380        let claudecode = self.claudecode.clone();
1381        let arbor = self.arbor_storage.clone();
1382        let lb = self.loopback.storage();
1383        let pm = self.pm.clone();
1384
1385        stream! {
1386            let metadata = serde_json::json!({
1387                "_plexus_run_config": {
1388                    "model": model_str,
1389                    "working_directory": wd,
1390                }
1391            });
1392            let graph = match graph_runtime.create_graph(metadata).await {
1393                Ok(g) => Arc::new(g),
1394                Err(e) => {
1395                    yield OrchaEvent::Failed { session_id: String::new(), error: e };
1396                    return;
1397                }
1398            };
1399            let graph_id = graph.graph_id.clone();
1400
1401            let node_id = match graph.add_plan(task.clone()).await {
1402                Ok(id) => id,
1403                Err(e) => {
1404                    yield OrchaEvent::Failed { session_id: graph_id, error: e };
1405                    return;
1406                }
1407            };
1408
1409            let ticket_map: std::collections::HashMap<String, String> =
1410                [("plan".to_string(), node_id.clone())].into_iter().collect();
1411            let _ = pm.save_ticket_map(&graph_id, &ticket_map).await;
1412            let _ = pm.save_ticket_source(&graph_id, &task).await;
1413
1414            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1415            cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
1416
1417            let node_to_ticket: std::collections::HashMap<String, String> =
1418                [(node_id, "plan".to_string())].into_iter().collect();
1419            let execution = graph_runner::run_graph_execution(
1420                graph, claudecode, arbor, lb, pm,
1421                graph_runtime, cancel_registry.clone(),
1422                model_enum, wd, cancel_rx, node_to_ticket,
1423            );
1424            tokio::pin!(execution);
1425            while let Some(event) = execution.next().await {
1426                yield event;
1427            }
1428            cancel_registry.lock().await.remove(&graph_id);
1429        }
1430    }
1431
1432    /// Stop a running graph and all its agent tasks.
1433    ///
1434    /// Sends a cancellation signal to all node tasks currently executing within the
1435    /// graph. Each task abandons its chat stream and returns an error, which causes
1436    /// the lattice to mark those nodes as failed and ultimately fail the graph.
1437    ///
1438    /// If the graph is not currently running (no cancel token registered), yields
1439    /// `OrchaEvent::Failed` with a "not found" error.
1440    #[plexus_macros::method(params(
1441        graph_id = "Lattice graph ID to cancel"
1442    ))]
1443    async fn cancel_graph(
1444        &self,
1445        graph_id: String,
1446    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1447        let cancel_registry = self.cancel_registry.clone();
1448        let lattice_storage = self.graph_runtime.storage();
1449        stream! {
1450            // BFS to collect the root graph and all descendant graph IDs.
1451            let mut all_graph_ids: Vec<String> = Vec::new();
1452            let mut to_visit: std::collections::VecDeque<String> = std::collections::VecDeque::new();
1453            to_visit.push_back(graph_id.clone());
1454            while let Some(gid) = to_visit.pop_front() {
1455                all_graph_ids.push(gid.clone());
1456                if let Ok(children) = lattice_storage.get_child_graphs(&gid).await {
1457                    for child in children {
1458                        to_visit.push_back(child.id);
1459                    }
1460                }
1461            }
1462
1463            // Lock the registry once and cancel all collected graphs.
1464            let mut registry = cancel_registry.lock().await;
1465            let root_cancelled = registry.contains_key(&graph_id);
1466            for gid in all_graph_ids {
1467                if let Some(cancel_tx) = registry.remove(&gid) {
1468                    let _ = cancel_tx.send(true);
1469                }
1470            }
1471
1472            if root_cancelled {
1473                yield OrchaEvent::Cancelled { graph_id };
1474            } else {
1475                yield OrchaEvent::Failed {
1476                    session_id: graph_id,
1477                    error: "Graph not found in cancel registry (not running or already finished)".to_string(),
1478                };
1479            }
1480        }
1481    }
1482
1483    /// Subscribe to execution events for a graph — reconnectable observation stream.
1484    ///
1485    /// Replays all persisted events from `after_seq` (exclusive) and then tails
1486    /// live events until the graph reaches `GraphDone` or `GraphFailed`.
1487    ///
1488    /// Use `after_seq = 0` (or omit) to start from the beginning.  On reconnect,
1489    /// pass the last sequence number seen to resume without gaps.
1490    ///
1491    /// This is observation only — no nodes are dispatched.  Pair with
1492    /// `run_tickets_async` or `run_graph` (spawned) to drive execution.
1493    ///
1494    /// Client workflow:
1495    /// ```text
1496    /// graph_id = run_tickets_async(...)     # fires and forgets
1497    /// events   = subscribe_graph(graph_id)  # observe
1498    /// # on disconnect:
1499    /// events   = subscribe_graph(graph_id, after_seq=last_seen_seq)
1500    /// ```
1501    #[plexus_macros::method(params(
1502        graph_id = "Lattice graph ID from run_tickets_async or build_tickets",
1503        after_seq = "Sequence number to resume from (0 or omit to start from beginning)"
1504    ))]
1505    async fn subscribe_graph(
1506        &self,
1507        graph_id: String,
1508        after_seq: Option<u64>,
1509    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1510        let graph = self.graph_runtime.open_graph(graph_id.clone());
1511        let pm = self.pm.clone();
1512        stream! {
1513            // Load the ticket_id→node_id map from pm storage and invert it to node_id→ticket_id.
1514            let node_to_ticket: HashMap<String, String> = pm.get_ticket_map(&graph_id).await
1515                .unwrap_or_default()
1516                .into_iter()
1517                .map(|(ticket_id, node_id)| (node_id, ticket_id))
1518                .collect();
1519
1520            let event_stream = graph.watch(after_seq);
1521            tokio::pin!(event_stream);
1522
1523            // Progress tracking: count total nodes once, then track completions.
1524            let total_nodes: usize = graph.count_nodes().await.unwrap_or(0);
1525            let mut complete_nodes: usize = 0;
1526
1527            fn calc_percentage(complete: usize, total: usize) -> Option<u32> {
1528                if total == 0 { return None; }
1529                Some((complete as f32 / total as f32 * 100.0) as u32)
1530            }
1531
1532            while let Some(crate::activations::lattice::LatticeEventEnvelope { event, .. }) = event_stream.next().await {
1533                match event {
1534                    crate::activations::lattice::LatticeEvent::NodeReady { node_id, .. } => {
1535                        let ticket_id = node_to_ticket.get(&node_id).cloned();
1536                        yield OrchaEvent::NodeStarted {
1537                            node_id,
1538                            label: None,
1539                            ticket_id,
1540                            percentage: calc_percentage(complete_nodes, total_nodes),
1541                        };
1542                    }
1543                    crate::activations::lattice::LatticeEvent::NodeStarted { .. } => {
1544                        // Already emitted NodeStarted on NodeReady; suppress duplicate.
1545                    }
1546                    crate::activations::lattice::LatticeEvent::NodeDone { node_id, .. } => {
1547                        complete_nodes += 1;
1548                        let ticket_id = node_to_ticket.get(&node_id).cloned();
1549                        yield OrchaEvent::NodeComplete {
1550                            node_id,
1551                            label: None,
1552                            ticket_id,
1553                            output_summary: None,
1554                            percentage: calc_percentage(complete_nodes, total_nodes),
1555                        };
1556                    }
1557                    crate::activations::lattice::LatticeEvent::NodeFailed { node_id, error } => {
1558                        complete_nodes += 1;
1559                        let ticket_id = node_to_ticket.get(&node_id).cloned();
1560                        yield OrchaEvent::NodeFailed {
1561                            node_id,
1562                            label: None,
1563                            ticket_id,
1564                            error,
1565                            percentage: calc_percentage(complete_nodes, total_nodes),
1566                        };
1567                    }
1568                    crate::activations::lattice::LatticeEvent::GraphDone { graph_id } => {
1569                        yield OrchaEvent::Complete { session_id: graph_id };
1570                        return;
1571                    }
1572                    crate::activations::lattice::LatticeEvent::GraphFailed { graph_id, node_id, error } => {
1573                        yield OrchaEvent::Failed {
1574                            session_id: graph_id,
1575                            error: format!("Node {} failed: {}", node_id, error),
1576                        };
1577                        return;
1578                    }
1579                }
1580            }
1581        }
1582    }
1583
1584    /// Like `subscribe_graph` but recursively follows child graphs created by Plan nodes,
1585    /// multiplexing all events into one stream. Ends only when the ROOT graph completes or
1586    /// fails.
1587    ///
1588    /// When a Plan node runs, it creates a child graph and executes it.
1589    /// `subscribe_graph` only sees the Plan node completing — all of the child graph's
1590    /// NodeStarted/NodeComplete/NodeFailed events are invisible to the caller.
1591    /// `watch_graph_tree` fixes this by polling for newly created child graphs every 500 ms
1592    /// and subscribing to each one as it appears.
1593    #[plexus_macros::method(params(
1594        graph_id = "Root graph ID to watch (recursively includes all child graphs)",
1595        after_seq = "Sequence number for the root graph to resume from (0 or omit)"
1596    ))]
1597    async fn watch_graph_tree(
1598        &self,
1599        graph_id: String,
1600        after_seq: Option<u64>,
1601    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1602        let graph_runtime = self.graph_runtime.clone();
1603        let pm = self.pm.clone();
1604        let root_id = graph_id.clone();
1605        stream! {
1606            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<OrchaEvent>();
1607            let known_ids: Arc<tokio::sync::Mutex<std::collections::HashSet<String>>> =
1608                Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new()));
1609
1610            // Spawn watcher for the root graph.
1611            known_ids.lock().await.insert(root_id.clone());
1612            {
1613                let gr = graph_runtime.clone();
1614                let pm_w = pm.clone();
1615                let tx_w = tx.clone();
1616                let rid = root_id.clone();
1617                tokio::spawn(async move {
1618                    watch_single_graph(rid, after_seq, gr, pm_w, tx_w).await;
1619                });
1620            }
1621
1622            // Discovery task: poll every 500 ms for newly-created child graphs.
1623            {
1624                let lattice_storage = graph_runtime.storage();
1625                let known = known_ids.clone();
1626                let tx_disc = tx.clone();
1627                let gr_disc = graph_runtime.clone();
1628                let pm_disc = pm.clone();
1629                tokio::spawn(async move {
1630                    loop {
1631                        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1632                        if tx_disc.is_closed() {
1633                            break;
1634                        }
1635                        let current_known: Vec<String> =
1636                            known.lock().await.iter().cloned().collect();
1637                        for gid in current_known {
1638                            if let Ok(children) = lattice_storage.get_child_graphs(&gid).await {
1639                                for child in children {
1640                                    let mut guard = known.lock().await;
1641                                    if !guard.contains(&child.id) {
1642                                        guard.insert(child.id.clone());
1643                                        drop(guard);
1644                                        let gr = gr_disc.clone();
1645                                        let pm_c = pm_disc.clone();
1646                                        let tx_c = tx_disc.clone();
1647                                        let cid = child.id;
1648                                        tokio::spawn(async move {
1649                                            watch_single_graph(cid, None, gr, pm_c, tx_c).await;
1650                                        });
1651                                    }
1652                                }
1653                            }
1654                        }
1655                    }
1656                });
1657            }
1658
1659            // Forward events; stop when the root graph reports terminal status.
1660            while let Some(event) = rx.recv().await {
1661                let is_root_terminal = matches!(&event,
1662                    OrchaEvent::Complete { session_id } | OrchaEvent::Failed { session_id, .. }
1663                    if session_id == &root_id
1664                );
1665                yield event;
1666                if is_root_terminal {
1667                    break;
1668                }
1669            }
1670        }
1671    }
1672
1673    // ─── Graph Builder API ───────────────────────────────────────────────────────
1674
1675    /// Create an empty Orcha execution graph.
1676    #[plexus_macros::method(params(
1677        metadata = "Arbitrary JSON metadata attached to the graph"
1678    ))]
1679    async fn create_graph(
1680        &self,
1681        metadata: Value,
1682    ) -> impl Stream<Item = OrchaCreateGraphResult> + Send + 'static {
1683        let graph_runtime = self.graph_runtime.clone();
1684        stream! {
1685            match graph_runtime.create_graph(metadata).await {
1686                Ok(graph) => yield OrchaCreateGraphResult::Ok { graph_id: graph.graph_id },
1687                Err(e) => yield OrchaCreateGraphResult::Err { message: e },
1688            }
1689        }
1690    }
1691
1692    /// Add a task node — Claude runs `task` as a prompt.
1693    #[plexus_macros::method(params(
1694        graph_id = "Graph to add the node to",
1695        task = "Prompt for Claude to execute"
1696    ))]
1697    async fn add_task_node(
1698        &self,
1699        graph_id: String,
1700        task: String,
1701    ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1702        let graph = self.graph_runtime.open_graph(graph_id);
1703        stream! {
1704            match graph.add_task(task, None).await {
1705                Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1706                Err(e) => yield OrchaAddNodeResult::Err { message: e },
1707            }
1708        }
1709    }
1710
1711    /// Add a synthesize node — like task, with prior_work context prepended from input tokens.
1712    #[plexus_macros::method(params(
1713        graph_id = "Graph to add the node to",
1714        task = "Synthesis prompt for Claude"
1715    ))]
1716    async fn add_synthesize_node(
1717        &self,
1718        graph_id: String,
1719        task: String,
1720    ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1721        let graph = self.graph_runtime.open_graph(graph_id);
1722        stream! {
1723            match graph.add_synthesize(task, None).await {
1724                Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1725                Err(e) => yield OrchaAddNodeResult::Err { message: e },
1726            }
1727        }
1728    }
1729
1730    /// Add a validate node — runs `command` in a shell.
1731    #[plexus_macros::method(params(
1732        graph_id = "Graph to add the node to",
1733        command = "Shell command to validate (exit 0 = pass)",
1734        cwd = "Working directory (default: /workspace)"
1735    ))]
1736    async fn add_validate_node(
1737        &self,
1738        graph_id: String,
1739        command: String,
1740        cwd: Option<String>,
1741    ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1742        let graph = self.graph_runtime.open_graph(graph_id);
1743        stream! {
1744            match graph.add_validate(command, cwd, None).await {
1745                Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1746                Err(e) => yield OrchaAddNodeResult::Err { message: e },
1747            }
1748        }
1749    }
1750
1751    /// Add a gather node — engine-internal, auto-executes when all inbound tokens arrive.
1752    #[plexus_macros::method(params(
1753        graph_id = "Graph to add the node to",
1754        strategy = "Gather strategy: {\"type\":\"all\"} or {\"type\":\"first\",\"n\":N}"
1755    ))]
1756    async fn add_gather_node(
1757        &self,
1758        graph_id: String,
1759        strategy: GatherStrategy,
1760    ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1761        let graph = self.graph_runtime.open_graph(graph_id);
1762        stream! {
1763            match graph.add_gather(strategy).await {
1764                Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1765                Err(e) => yield OrchaAddNodeResult::Err { message: e },
1766            }
1767        }
1768    }
1769
1770    /// Add a SubGraph node — when dispatched, runs the child graph to completion.
1771    ///
1772    /// On child success, the parent node receives `{"child_graph_id": "..."}` as output.
1773    /// On child failure, the parent node is failed (error edge fires if present).
1774    #[plexus_macros::method(params(
1775        graph_id = "Graph to add the node to",
1776        child_graph_id = "ID of the graph to run as a sub-graph"
1777    ))]
1778    async fn add_subgraph_node(
1779        &self,
1780        graph_id: String,
1781        child_graph_id: String,
1782    ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1783        let graph = self.graph_runtime.open_graph(graph_id);
1784        stream! {
1785            match graph.add_subgraph(child_graph_id).await {
1786                Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1787                Err(e) => yield OrchaAddNodeResult::Err { message: e },
1788            }
1789        }
1790    }
1791
1792    /// Declare that `dependent_node_id` waits for `dependency_node_id` to complete.
1793    #[plexus_macros::method(params(
1794        graph_id = "Graph containing both nodes",
1795        dependent_node_id = "Node that must wait",
1796        dependency_node_id = "Node that must complete first"
1797    ))]
1798    async fn add_dependency(
1799        &self,
1800        graph_id: String,
1801        dependent_node_id: String,
1802        dependency_node_id: String,
1803    ) -> impl Stream<Item = OrchaAddDependencyResult> + Send + 'static {
1804        let graph = self.graph_runtime.open_graph(graph_id);
1805        stream! {
1806            match graph.depends_on(&dependent_node_id, &dependency_node_id).await {
1807                Ok(()) => yield OrchaAddDependencyResult::Ok,
1808                Err(e) => yield OrchaAddDependencyResult::Err { message: e },
1809            }
1810        }
1811    }
1812
1813    /// Compile a ticket file and build the lattice graph without executing it.
1814    ///
1815    /// Returns the graph_id.  Use `orcha.run_graph(graph_id)` to execute it
1816    /// separately, or `orcha.run_tickets` to build and run in one call.
1817    #[plexus_macros::method(params(
1818        tickets = "Raw ticket file content",
1819        metadata = "Arbitrary JSON metadata attached to the graph"
1820    ))]
1821    async fn build_tickets(
1822        &self,
1823        tickets: String,
1824        metadata: Value,
1825    ) -> impl Stream<Item = OrchaCreateGraphResult> + Send + 'static {
1826        let graph_runtime = self.graph_runtime.clone();
1827        let pm = self.pm.clone();
1828        stream! {
1829            let compiled = match ticket_compiler::compile_tickets(&tickets) {
1830                Ok(c) => c,
1831                Err(e) => {
1832                    yield OrchaCreateGraphResult::Err {
1833                        message: format!("Ticket compile error: {}", e),
1834                    };
1835                    return;
1836                }
1837            };
1838            match build_graph_from_definition(
1839                graph_runtime, metadata, compiled.nodes, compiled.edges,
1840            ).await {
1841                Ok((graph_id, id_map)) => {
1842                    let _ = pm.save_ticket_map(&graph_id, &id_map).await;
1843                    yield OrchaCreateGraphResult::Ok { graph_id };
1844                }
1845                Err(e) => yield OrchaCreateGraphResult::Err { message: e },
1846            }
1847        }
1848    }
1849
1850    /// Compile a ticket file and execute the resulting graph.
1851    ///
1852    /// Parses the ticket DSL, builds a graph, and streams execution events.
1853    ///
1854    /// # Ticket Format
1855    ///
1856    /// ```text
1857    /// --- <id> [<type>] [> dep1, dep2, ...]
1858    /// task: <text>
1859    /// validate: <shell command>   (optional; auto-creates sibling validate node)
1860    /// ```
1861    ///
1862    /// Types: `agent`, `agent/synthesize`, `prog`
1863    #[plexus_macros::method(params(
1864        tickets = "Raw ticket file content",
1865        metadata = "Arbitrary JSON metadata attached to the graph",
1866        model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
1867        working_directory = "Working directory for task nodes (default: /workspace)"
1868    ))]
1869    async fn run_tickets(
1870        &self,
1871        tickets: String,
1872        metadata: Value,
1873        model: Option<String>,
1874        working_directory: Option<String>,
1875    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1876        let graph_runtime = self.graph_runtime.clone();
1877        let claudecode = self.claudecode.clone();
1878        let arbor_storage = self.arbor_storage.clone();
1879        let loopback_storage = self.loopback.storage();
1880        let pm = self.pm.clone();
1881        let cancel_registry = self.cancel_registry.clone();
1882        stream! {
1883            let compiled = match ticket_compiler::compile_tickets(&tickets) {
1884                Ok(c) => c,
1885                Err(e) => {
1886                    yield OrchaEvent::Failed {
1887                        session_id: "tickets".to_string(),
1888                        error: format!("Ticket compile error: {}", e),
1889                    };
1890                    return;
1891                }
1892            };
1893            let model_str = model.as_deref().unwrap_or("sonnet").to_string();
1894            let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1895            let mut enriched_metadata = if metadata.is_object() {
1896                metadata.clone()
1897            } else {
1898                serde_json::json!({})
1899            };
1900            enriched_metadata["_plexus_run_config"] = serde_json::json!({
1901                "model": model_str,
1902                "working_directory": wd,
1903            });
1904            let (graph_id, id_map) = match build_graph_from_definition(
1905                graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
1906            ).await {
1907                Ok(pair) => pair,
1908                Err(e) => {
1909                    yield OrchaEvent::Failed {
1910                        session_id: "tickets".to_string(),
1911                        error: e,
1912                    };
1913                    return;
1914                }
1915            };
1916            let _ = pm.save_ticket_map(&graph_id, &id_map).await;
1917            let _ = pm.save_ticket_source(&graph_id, &tickets).await;
1918
1919            yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
1920
1921            let model_enum = match model_str.as_str() {
1922                "opus" => Model::Opus,
1923                "haiku" => Model::Haiku,
1924                _ => Model::Sonnet,
1925            };
1926
1927            // Validate working directory early — before the graph starts executing —
1928            // so the caller gets a clear error instead of every node failing with an
1929            // opaque Claude CLI exit message.
1930            if !std::path::Path::new(&wd).is_dir() {
1931                yield OrchaEvent::Failed {
1932                    session_id: "tickets".to_string(),
1933                    error: format!(
1934                        "Working directory does not exist: '{}'. \
1935                         Create it before running tickets or pass an existing path.",
1936                        wd
1937                    ),
1938                };
1939                return;
1940            }
1941
1942            // Build a node_id → ticket_id map from id_map (which is ticket_id → node_id).
1943            let node_to_ticket: std::collections::HashMap<String, String> = id_map
1944                .iter()
1945                .map(|(ticket, node)| (node.clone(), ticket.clone()))
1946                .collect();
1947
1948            let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
1949
1950            // Register a cancel token so this graph can be stopped via cancel_graph.
1951            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1952            cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
1953
1954            // Spawn execution in background — caller can disconnect safely and use
1955            // subscribe_graph(graph_id) to re-attach at any time.
1956            tokio::spawn(async move {
1957                let execution = graph_runner::run_graph_execution(
1958                    graph,
1959                    claudecode,
1960                    arbor_storage,
1961                    loopback_storage,
1962                    pm,
1963                    graph_runtime,
1964                    cancel_registry.clone(),
1965                    model_enum,
1966                    wd,
1967                    cancel_rx,
1968                    node_to_ticket,
1969                );
1970                tokio::pin!(execution);
1971                while let Some(event) = execution.next().await {
1972                    match &event {
1973                        OrchaEvent::Failed { error, .. } => {
1974                            tracing::error!("run_tickets graph {} failed: {}", graph_id, error);
1975                        }
1976                        OrchaEvent::Complete { .. } => {
1977                            tracing::info!("run_tickets graph {} complete", graph_id);
1978                        }
1979                        _ => {}
1980                    }
1981                }
1982                cancel_registry.lock().await.remove(&graph_id);
1983            });
1984        }
1985    }
1986
1987    /// Compile a ticket file, build the lattice graph, and start execution in the background.
1988    ///
1989    /// Returns immediately after yielding a single `GraphStarted { graph_id }` event.
1990    /// Execution continues in a detached tokio task; use `subscribe_graph(graph_id)` to
1991    /// observe progress, or `pm.graph_status(graph_id)` to poll completion.
1992    ///
1993    /// This is the fire-and-forget counterpart to `run_tickets`, which blocks until the
1994    /// graph completes.  Use `run_tickets_async` when the caller cannot hold the connection
1995    /// open for the full duration of a long-running graph.
1996    #[plexus_macros::method(params(
1997        tickets = "Raw ticket file content",
1998        metadata = "Arbitrary JSON metadata",
1999        model = "Model: opus, sonnet, haiku (default: sonnet)",
2000        working_directory = "Working directory (default: /workspace)"
2001    ))]
2002    async fn run_tickets_async(
2003        &self,
2004        tickets: String,
2005        metadata: Value,
2006        model: Option<String>,
2007        working_directory: Option<String>,
2008    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2009        let graph_runtime = self.graph_runtime.clone();
2010        let claudecode = self.claudecode.clone();
2011        let arbor_storage = self.arbor_storage.clone();
2012        let loopback_storage = self.loopback.storage();
2013        let pm = self.pm.clone();
2014        let cancel_registry = self.cancel_registry.clone();
2015        stream! {
2016            let compiled = match ticket_compiler::compile_tickets(&tickets) {
2017                Ok(c) => c,
2018                Err(e) => {
2019                    yield OrchaEvent::Failed {
2020                        session_id: "tickets".to_string(),
2021                        error: format!("Ticket compile error: {}", e),
2022                    };
2023                    return;
2024                }
2025            };
2026
2027            let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2028            let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2029
2030            // Validate working directory before building the graph so the caller
2031            // gets a clear error rather than every node failing with an opaque message.
2032            if !std::path::Path::new(&wd).is_dir() {
2033                yield OrchaEvent::Failed {
2034                    session_id: "tickets".to_string(),
2035                    error: format!(
2036                        "Working directory does not exist: '{}'. \
2037                         Create it before running tickets or pass an existing path.",
2038                        wd
2039                    ),
2040                };
2041                return;
2042            }
2043
2044            let mut enriched_metadata = if metadata.is_object() {
2045                metadata.clone()
2046            } else {
2047                serde_json::json!({})
2048            };
2049            enriched_metadata["_plexus_run_config"] = serde_json::json!({
2050                "model": model_str,
2051                "working_directory": wd,
2052            });
2053
2054            let (graph_id, id_map) = match build_graph_from_definition(
2055                graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2056            ).await {
2057                Ok(pair) => pair,
2058                Err(e) => {
2059                    yield OrchaEvent::Failed {
2060                        session_id: "tickets".to_string(),
2061                        error: e,
2062                    };
2063                    return;
2064                }
2065            };
2066
2067            let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2068            let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2069
2070            yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2071
2072            let model_enum = match model_str.as_str() {
2073                "opus" => Model::Opus,
2074                "haiku" => Model::Haiku,
2075                _ => Model::Sonnet,
2076            };
2077
2078            let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2079
2080            // Build a node_id → ticket_id map from id_map (which is ticket_id → node_id).
2081            let node_to_ticket: std::collections::HashMap<String, String> = id_map
2082                .iter()
2083                .map(|(ticket, node)| (node.clone(), ticket.clone()))
2084                .collect();
2085
2086            // Register a cancel token so this graph can be stopped via cancel_graph.
2087            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2088            cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2089
2090            tokio::spawn(async move {
2091                let execution = graph_runner::run_graph_execution(
2092                    graph,
2093                    claudecode,
2094                    arbor_storage,
2095                    loopback_storage,
2096                    pm,
2097                    graph_runtime,
2098                    cancel_registry.clone(),
2099                    model_enum,
2100                    wd,
2101                    cancel_rx,
2102                    node_to_ticket,
2103                );
2104                tokio::pin!(execution);
2105                while let Some(event) = execution.next().await {
2106                    match &event {
2107                        OrchaEvent::Failed { error, .. } => {
2108                            tracing::error!(
2109                                "run_tickets_async graph {} failed: {}",
2110                                graph_id, error
2111                            );
2112                        }
2113                        OrchaEvent::Complete { .. } => {
2114                            tracing::info!("run_tickets_async graph {} complete", graph_id);
2115                        }
2116                        _ => {}
2117                    }
2118                }
2119                cancel_registry.lock().await.remove(&graph_id);
2120            });
2121        }
2122    }
2123
2124    /// Read one or more ticket files from disk, concatenate them, compile, and run.
2125    ///
2126    /// Equivalent to reading each file and passing the joined content to `run_tickets`.
2127    /// Files are joined with a blank line separator; the compiler ignores preamble and
2128    /// section boundaries so cross-file `blocked_by` references work correctly.
2129    ///
2130    /// Streams OrchaEvents until the graph completes or fails.
2131    #[plexus_macros::method(params(
2132        paths = "Absolute paths to ticket markdown files, e.g. [\"/workspace/plans/batch.tickets.md\"]",
2133        metadata = "Arbitrary JSON metadata attached to the graph",
2134        model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2135        working_directory = "Working directory for task nodes (default: /workspace)"
2136    ))]
2137    async fn run_tickets_files(
2138        &self,
2139        paths: Vec<String>,
2140        metadata: Value,
2141        model: Option<String>,
2142        working_directory: Option<String>,
2143    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2144        let graph_runtime = self.graph_runtime.clone();
2145        let claudecode = self.claudecode.clone();
2146        let arbor_storage = self.arbor_storage.clone();
2147        let loopback_storage = self.loopback.storage();
2148        let pm = self.pm.clone();
2149        let cancel_registry = self.cancel_registry.clone();
2150        stream! {
2151            // Read and concatenate all files.
2152            let mut parts: Vec<String> = Vec::new();
2153            for path in &paths {
2154                match tokio::fs::read_to_string(path).await {
2155                    Ok(content) => parts.push(content),
2156                    Err(e) => {
2157                        yield OrchaEvent::Failed {
2158                            session_id: "tickets".to_string(),
2159                            error: format!("Failed to read '{}': {}", path, e),
2160                        };
2161                        return;
2162                    }
2163                }
2164            }
2165            let tickets = parts.join("\n\n");
2166
2167            let compiled = match ticket_compiler::compile_tickets(&tickets) {
2168                Ok(c) => c,
2169                Err(e) => {
2170                    yield OrchaEvent::Failed {
2171                        session_id: "tickets".to_string(),
2172                        error: format!("Ticket compile error: {}", e),
2173                    };
2174                    return;
2175                }
2176            };
2177            let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2178            let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2179            let mut enriched_metadata = if metadata.is_object() { metadata.clone() } else { serde_json::json!({}) };
2180            enriched_metadata["_plexus_run_config"] = serde_json::json!({
2181                "model": model_str,
2182                "working_directory": wd,
2183            });
2184            let (graph_id, id_map) = match build_graph_from_definition(
2185                graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2186            ).await {
2187                Ok(pair) => pair,
2188                Err(e) => {
2189                    yield OrchaEvent::Failed { session_id: "tickets".to_string(), error: e };
2190                    return;
2191                }
2192            };
2193            let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2194            let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2195
2196            yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2197
2198            let model_enum = match model_str.as_str() {
2199                "opus" => Model::Opus,
2200                "haiku" => Model::Haiku,
2201                _ => Model::Sonnet,
2202            };
2203            if !std::path::Path::new(&wd).is_dir() {
2204                yield OrchaEvent::Failed {
2205                    session_id: "tickets".to_string(),
2206                    error: format!("Working directory does not exist: '{}'", wd),
2207                };
2208                return;
2209            }
2210            let node_to_ticket: std::collections::HashMap<String, String> = id_map
2211                .iter().map(|(t, n)| (n.clone(), t.clone())).collect();
2212            let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2213            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2214            cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2215            let execution = graph_runner::run_graph_execution(
2216                graph, claudecode, arbor_storage, loopback_storage, pm,
2217                graph_runtime, cancel_registry.clone(),
2218                model_enum, wd, cancel_rx, node_to_ticket,
2219            );
2220            tokio::pin!(execution);
2221            while let Some(event) = execution.next().await {
2222                yield event;
2223            }
2224            cancel_registry.lock().await.remove(&graph_id);
2225        }
2226    }
2227
2228    /// Like `run_tickets_files` but fire-and-forget — returns `GraphStarted` immediately.
2229    ///
2230    /// Use `subscribe_graph(graph_id)` to observe progress after this call returns.
2231    #[plexus_macros::method(params(
2232        paths = "Absolute paths to ticket markdown files",
2233        metadata = "Arbitrary JSON metadata attached to the graph",
2234        model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2235        working_directory = "Working directory for task nodes (default: /workspace)"
2236    ))]
2237    async fn run_tickets_async_files(
2238        &self,
2239        paths: Vec<String>,
2240        metadata: Value,
2241        model: Option<String>,
2242        working_directory: Option<String>,
2243    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2244        let graph_runtime = self.graph_runtime.clone();
2245        let claudecode = self.claudecode.clone();
2246        let arbor_storage = self.arbor_storage.clone();
2247        let loopback_storage = self.loopback.storage();
2248        let pm = self.pm.clone();
2249        let cancel_registry = self.cancel_registry.clone();
2250        stream! {
2251            let mut parts: Vec<String> = Vec::new();
2252            for path in &paths {
2253                match tokio::fs::read_to_string(path).await {
2254                    Ok(content) => parts.push(content),
2255                    Err(e) => {
2256                        yield OrchaEvent::Failed {
2257                            session_id: "tickets".to_string(),
2258                            error: format!("Failed to read '{}': {}", path, e),
2259                        };
2260                        return;
2261                    }
2262                }
2263            }
2264            let tickets = parts.join("\n\n");
2265
2266            let compiled = match ticket_compiler::compile_tickets(&tickets) {
2267                Ok(c) => c,
2268                Err(e) => {
2269                    yield OrchaEvent::Failed {
2270                        session_id: "tickets".to_string(),
2271                        error: format!("Ticket compile error: {}", e),
2272                    };
2273                    return;
2274                }
2275            };
2276            let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2277            let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2278            let mut enriched_metadata = if metadata.is_object() { metadata.clone() } else { serde_json::json!({}) };
2279            enriched_metadata["_plexus_run_config"] = serde_json::json!({
2280                "model": model_str,
2281                "working_directory": wd,
2282            });
2283            let (graph_id, id_map) = match build_graph_from_definition(
2284                graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2285            ).await {
2286                Ok(pair) => pair,
2287                Err(e) => {
2288                    yield OrchaEvent::Failed { session_id: "tickets".to_string(), error: e };
2289                    return;
2290                }
2291            };
2292            let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2293            let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2294
2295            yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2296
2297            let model_enum = match model_str.as_str() {
2298                "opus" => Model::Opus,
2299                "haiku" => Model::Haiku,
2300                _ => Model::Sonnet,
2301            };
2302            if !std::path::Path::new(&wd).is_dir() {
2303                yield OrchaEvent::Failed {
2304                    session_id: "tickets".to_string(),
2305                    error: format!("Working directory does not exist: '{}'", wd),
2306                };
2307                return;
2308            }
2309            let node_to_ticket: std::collections::HashMap<String, String> = id_map
2310                .iter().map(|(t, n)| (n.clone(), t.clone())).collect();
2311            let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2312            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2313            cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2314            tokio::spawn(async move {
2315                let execution = graph_runner::run_graph_execution(
2316                    graph, claudecode, arbor_storage, loopback_storage, pm,
2317                    graph_runtime, cancel_registry.clone(),
2318                    model_enum, wd, cancel_rx, node_to_ticket,
2319                );
2320                tokio::pin!(execution);
2321                while let Some(event) = execution.next().await {
2322                    match &event {
2323                        OrchaEvent::Failed { error, .. } => {
2324                            tracing::error!("run_tickets_async_files graph {} failed: {}", graph_id, error);
2325                        }
2326                        OrchaEvent::Complete { .. } => {
2327                            tracing::info!("run_tickets_async_files graph {} complete", graph_id);
2328                        }
2329                        _ => {}
2330                    }
2331                }
2332                cancel_registry.lock().await.remove(&graph_id);
2333            });
2334        }
2335    }
2336
2337    /// Build and execute a graph from an inline definition.
2338    ///
2339    /// Nodes use caller-supplied string ids; edges reference those ids.
2340    /// Streams OrchaEvents. The graph_id appears in progress and complete/failed events.
2341    #[plexus_macros::method(params(
2342        metadata = "Arbitrary JSON metadata attached to the graph",
2343        model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2344        working_directory = "Working directory for task nodes (default: /workspace)",
2345        nodes = "Array of OrchaNodeDef: [{\"id\":\"...\",\"spec\":{\"type\":\"task\",\"task\":\"...\"}}]",
2346        edges = "Array of OrchaEdgeDef: [{\"from\":\"id1\",\"to\":\"id2\"}]"
2347    ))]
2348    async fn run_graph_definition(
2349        &self,
2350        metadata: Value,
2351        model: Option<String>,
2352        working_directory: Option<String>,
2353        nodes: Vec<OrchaNodeDef>,
2354        edges: Vec<OrchaEdgeDef>,
2355    ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2356        build_and_run_graph_definition(
2357            self.graph_runtime.clone(),
2358            self.claudecode.clone(),
2359            self.arbor_storage.clone(),
2360            self.loopback.storage(),
2361            self.cancel_registry.clone(),
2362            self.pm.clone(),
2363            metadata,
2364            model,
2365            working_directory,
2366            nodes,
2367            edges,
2368        )
2369    }
2370}
2371
2372// ═══════════════════════════════════════════════════════════════════════════
2373// Graph Construction (shared pure-build step)
2374// ═══════════════════════════════════════════════════════════════════════════
2375
2376/// Build a lattice graph from a node+edge definition.
2377///
2378/// Creates the graph, adds all nodes (returning their lattice node-ids),
2379/// and wires all edges.  Returns `(graph_id, ticket_id→node_id map)` on success.
2380///
2381/// This is the shared foundation used by `build_tickets`, `build_graph_definition`,
2382/// and `build_and_run_graph_definition`.
2383async fn build_graph_from_definition(
2384    graph_runtime: Arc<GraphRuntime>,
2385    metadata: Value,
2386    nodes: Vec<OrchaNodeDef>,
2387    edges: Vec<OrchaEdgeDef>,
2388) -> Result<(String, HashMap<String, String>), String> {
2389    let graph = graph_runtime
2390        .create_graph(metadata)
2391        .await
2392        .map_err(|e| format!("Failed to create graph: {}", e))?;
2393    let graph_id = graph.graph_id.clone();
2394
2395    let mut id_map: HashMap<String, String> = HashMap::new();
2396    for OrchaNodeDef { id, spec } in nodes {
2397        let result = match spec {
2398            OrchaNodeSpec::Task { task, max_retries } => graph.add_task(task, max_retries).await,
2399            OrchaNodeSpec::Synthesize { task, max_retries } => graph.add_synthesize(task, max_retries).await,
2400            OrchaNodeSpec::Validate { command, cwd, max_retries } => graph.add_validate(command, cwd, max_retries).await,
2401            OrchaNodeSpec::Gather { strategy } => graph.add_gather(strategy).await,
2402            OrchaNodeSpec::Review { prompt } => graph.add_review(prompt).await,
2403            OrchaNodeSpec::Plan { task } => graph.add_plan(task).await,
2404        };
2405        let lattice_id = match result {
2406            Ok(lid) => lid,
2407            Err(e) => return Err(format!("Failed to add node '{}': {}", id, e)),
2408        };
2409        id_map.insert(id, lattice_id);
2410    }
2411
2412    for OrchaEdgeDef { from, to } in edges {
2413        let dep_id = id_map
2414            .get(&from)
2415            .ok_or_else(|| format!("Unknown node id in edge.from: '{}'", from))?
2416            .clone();
2417        let node_id = id_map
2418            .get(&to)
2419            .ok_or_else(|| format!("Unknown node id in edge.to: '{}'", to))?
2420            .clone();
2421        graph
2422            .depends_on(&node_id, &dep_id)
2423            .await
2424            .map_err(|e| format!("Failed to add edge {} → {}: {}", from, to, e))?;
2425    }
2426
2427    Ok((graph_id, id_map))
2428}
2429
2430// ─── Build + run ─────────────────────────────────────────────────────────────
2431
2432fn build_and_run_graph_definition<P: HubContext + 'static>(
2433    graph_runtime: Arc<GraphRuntime>,
2434    claudecode: Arc<ClaudeCode<P>>,
2435    arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
2436    loopback_storage: Arc<crate::activations::claudecode_loopback::LoopbackStorage>,
2437    cancel_registry: CancelRegistry,
2438    pm: Arc<super::pm::Pm>,
2439    metadata: Value,
2440    model: Option<String>,
2441    working_directory: Option<String>,
2442    nodes: Vec<OrchaNodeDef>,
2443    edges: Vec<OrchaEdgeDef>,
2444) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2445    stream! {
2446        let (graph_id, _) = match build_graph_from_definition(
2447            graph_runtime.clone(), metadata, nodes, edges,
2448        ).await {
2449            Ok(pair) => pair,
2450            Err(e) => {
2451                yield OrchaEvent::Failed {
2452                    session_id: "graph_definition".to_string(),
2453                    error: e,
2454                };
2455                return;
2456            }
2457        };
2458
2459        yield OrchaEvent::Progress {
2460            message: format!("Graph {} ready, starting execution", graph_id),
2461            percentage: None,
2462        };
2463
2464        let model_enum = match model.as_deref().unwrap_or("sonnet") {
2465            "opus" => Model::Opus,
2466            "haiku" => Model::Haiku,
2467            _ => Model::Sonnet,
2468        };
2469        let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2470
2471        // Register a cancel token so this graph can be stopped via cancel_graph.
2472        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2473        cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2474
2475        let execution = graph_runner::run_graph_execution(
2476            Arc::new(graph_runtime.open_graph(graph_id.clone())),
2477            claudecode,
2478            arbor_storage,
2479            loopback_storage,
2480            pm,
2481            graph_runtime,
2482            cancel_registry.clone(),
2483            model_enum,
2484            wd,
2485            cancel_rx,
2486            std::collections::HashMap::new(),
2487        );
2488        tokio::pin!(execution);
2489        while let Some(event) = execution.next().await {
2490            yield event;
2491        }
2492        cancel_registry.lock().await.remove(&graph_id);
2493    }
2494}
2495
2496// ═══════════════════════════════════════════════════════════════════════════
2497// Helper Functions (original)
2498// ═══════════════════════════════════════════════════════════════════════════
2499
2500/// Extract validation artifact from accumulated text
2501fn extract_validation_artifact(text: &str) -> Option<ValidationArtifact> {
2502    // Look for {"orcha_validate": {...}} pattern
2503    use regex::Regex;
2504
2505    let re = match Regex::new(r#"\{"orcha_validate"\s*:\s*(\{[^}]+\})\}"#) {
2506        Ok(re) => re,
2507        Err(e) => {
2508            tracing::warn!("Failed to compile orcha_validate regex: {}", e);
2509            return None;
2510        }
2511    };
2512    let captures = re.captures(text)?;
2513    let json_str = captures.get(1)?.as_str();
2514
2515    match serde_json::from_str::<ValidationArtifact>(json_str) {
2516        Ok(artifact) => Some(artifact),
2517        Err(e) => {
2518            tracing::warn!("Failed to parse validation artifact JSON '{}': {}", json_str, e);
2519            None
2520        }
2521    }
2522}
2523
2524/// Run a validation test command
2525async fn run_validation_test(artifact: &ValidationArtifact) -> ValidationResult {
2526    let output = Command::new("sh")
2527        .arg("-c")
2528        .arg(&artifact.test_command)
2529        .current_dir(&artifact.cwd)
2530        .output()
2531        .await;
2532
2533    match output {
2534        Ok(output) => ValidationResult {
2535            success: output.status.success(),
2536            output: String::from_utf8_lossy(&output.stdout).to_string()
2537                + &String::from_utf8_lossy(&output.stderr).to_string(),
2538            exit_code: output.status.code(),
2539        },
2540        Err(e) => ValidationResult {
2541            success: false,
2542            output: format!("Failed to execute command: {}", e),
2543            exit_code: None,
2544        },
2545    }
2546}
2547
2548/// Format an arbor tree into a readable conversation
2549///
2550/// Converts the JSON-based arbor tree structure into a human-readable conversation format
2551fn format_conversation_from_tree(tree: &crate::activations::arbor::Tree) -> String {
2552    use crate::activations::arbor::NodeType;
2553
2554    let mut output = String::new();
2555    let mut current_role = String::new();
2556    let mut message_text = String::new();
2557    let mut tool_uses = Vec::new();
2558
2559    // Walk the tree in order
2560    fn walk_nodes(
2561        tree: &crate::activations::arbor::Tree,
2562        node_id: &crate::activations::arbor::NodeId,
2563        output: &mut String,
2564        current_role: &mut String,
2565        message_text: &mut String,
2566        tool_uses: &mut Vec<String>,
2567    ) {
2568        if let Some(node) = tree.nodes.get(node_id) {
2569            if let NodeType::Text { content } = &node.data {
2570                // Try to parse as JSON to extract event type
2571                if let Ok(event) = serde_json::from_str::<serde_json::Value>(content) {
2572                    if let Some(event_type) = event.get("type").and_then(|v| v.as_str()) {
2573                        match event_type {
2574                            "user_message" => {
2575                                // Flush previous message
2576                                flush_message(output, current_role, message_text, tool_uses);
2577
2578                                *current_role = "User".to_string();
2579                                if let Some(content) = event.get("content").and_then(|v| v.as_str()) {
2580                                    *message_text = content.to_string();
2581                                }
2582                            }
2583                            "assistant_start" => {
2584                                // Flush previous message
2585                                flush_message(output, current_role, message_text, tool_uses);
2586
2587                                *current_role = "Assistant".to_string();
2588                                *message_text = String::new();
2589                            }
2590                            "content_text" => {
2591                                if let Some(text) = event.get("text").and_then(|v| v.as_str()) {
2592                                    message_text.push_str(text);
2593                                }
2594                            }
2595                            "content_tool_use" => {
2596                                if let Some(name) = event.get("name").and_then(|v| v.as_str()) {
2597                                    let mut tool_str = format!("[Tool: {}]", name);
2598                                    if let Some(input) = event.get("input") {
2599                                        if let Ok(input_str) = serde_json::to_string_pretty(input) {
2600                                            // Limit tool input to 200 chars
2601                                            let trimmed = if input_str.len() > 200 {
2602                                                format!("{}...", &input_str[..200])
2603                                            } else {
2604                                                input_str
2605                                            };
2606                                            tool_str.push_str(&format!(" {}", trimmed));
2607                                        }
2608                                    }
2609                                    tool_uses.push(tool_str);
2610                                }
2611                            }
2612                            _ => {} // Ignore other event types
2613                        }
2614                    }
2615                }
2616            }
2617
2618            // Recursively walk children
2619            for child_id in &node.children {
2620                walk_nodes(tree, child_id, output, current_role, message_text, tool_uses);
2621            }
2622        }
2623    }
2624
2625    fn flush_message(
2626        output: &mut String,
2627        current_role: &str,
2628        message_text: &str,
2629        tool_uses: &mut Vec<String>,
2630    ) {
2631        if !current_role.is_empty() && (!message_text.is_empty() || !tool_uses.is_empty()) {
2632            output.push_str(&format!("{}:\n", current_role));
2633            if !message_text.is_empty() {
2634                output.push_str(message_text);
2635                output.push_str("\n");
2636            }
2637            for tool in tool_uses.drain(..) {
2638                output.push_str(&format!("  {}\n", tool));
2639            }
2640            output.push_str("\n");
2641        }
2642    }
2643
2644    // Start walking from root
2645    walk_nodes(tree, &tree.root, &mut output, &mut current_role, &mut message_text, &mut tool_uses);
2646
2647    // Flush any remaining message
2648    flush_message(&mut output, &current_role, &message_text, &mut tool_uses);
2649
2650    output
2651}
2652
2653/// Save a status summary to the arbor monitoring tree
2654///
2655/// Creates a monitoring tree for the session (if it doesn't exist) and appends
2656/// the summary as a new text node with timestamp.
2657async fn save_status_summary_to_arbor(
2658    arbor_storage: &crate::activations::arbor::ArborStorage,
2659    session_id: &str,
2660    summary: &str,
2661) -> Result<(), String> {
2662    use crate::activations::arbor::TreeId;
2663
2664    // Generate deterministic tree ID from path: orcha.<session-id>.monitor
2665    let tree_path = format!("orcha.{}.monitor", session_id);
2666    let tree_uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, tree_path.as_bytes());
2667    let monitor_tree_id = TreeId::from(tree_uuid);
2668
2669    // Try to get existing tree, create if it doesn't exist
2670    let tree = match arbor_storage.tree_get(&monitor_tree_id).await {
2671        Ok(tree) => tree,
2672        Err(_) => {
2673            // Tree doesn't exist, create it with our deterministic ID
2674            let metadata = serde_json::json!({
2675                "type": "orcha_monitor",
2676                "session_id": session_id,
2677                "tree_path": tree_path
2678            });
2679
2680            let created_tree_id = arbor_storage.tree_create_with_id(
2681                Some(monitor_tree_id),
2682                Some(metadata),
2683                "orcha",
2684            ).await.map_err(|e| e.to_string())?;
2685
2686            arbor_storage.tree_get(&created_tree_id).await
2687                .map_err(|e| e.to_string())?
2688        }
2689    };
2690
2691    // Find the latest summary node to append to, or use root
2692    let parent = tree.nodes.values()
2693        .filter(|n| matches!(n.data, crate::activations::arbor::NodeType::Text { .. }))
2694        .max_by_key(|n| n.created_at)
2695        .map(|n| n.id)
2696        .unwrap_or(tree.root);
2697
2698    // Append summary as a text node with timestamp
2699    let timestamp = chrono::Utc::now().to_rfc3339();
2700    let summary_content = format!(
2701        "[{}] {}\n",
2702        timestamp,
2703        summary.trim()
2704    );
2705
2706    arbor_storage.node_create_text(
2707        &tree.id,
2708        Some(parent),
2709        summary_content,
2710        None,
2711    ).await.map_err(|e| e.to_string())?;
2712
2713    Ok(())
2714}
2715
2716/// Generate summary for a single agent
2717async fn generate_agent_summary<P: HubContext>(
2718    claudecode: &ClaudeCode<P>,
2719    arbor_storage: &crate::activations::arbor::ArborStorage,
2720    agent: AgentInfo,
2721) -> Result<AgentSummary, String> {
2722    use futures::StreamExt;
2723
2724    // Get conversation tree for this agent's ClaudeCode session
2725    let cc_session = claudecode.storage.session_get_by_name(&agent.claudecode_session_id).await
2726        .map_err(|e| format!("Failed to get CC session: {}", e))?;
2727
2728    let tree = arbor_storage.tree_get(&cc_session.head.tree_id).await
2729        .map_err(|e| format!("Failed to get tree: {}", e))?;
2730
2731    let conversation = format_conversation_from_tree(&tree);
2732
2733    // Create ephemeral session to generate summary
2734    let summary_session = format!("orcha-agent-summary-{}", Uuid::new_v4());
2735    let summary_session_id = format!("{}-agent-summary-{}", agent.session_id, Uuid::new_v4());
2736
2737    let create_stream = claudecode.create(
2738        summary_session.clone(),
2739        "/workspace".to_string(),
2740        crate::activations::claudecode::Model::Haiku,
2741        None,
2742        Some(false),
2743        Some(summary_session_id), // Track ephemeral summary session
2744    ).await;
2745    tokio::pin!(create_stream);
2746
2747    // Wait for creation
2748    let mut created = false;
2749    while let Some(result) = create_stream.next().await {
2750        if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
2751            created = true;
2752            break;
2753        }
2754    }
2755
2756    if !created {
2757        return Err("Failed to create summary session".to_string());
2758    }
2759
2760    // Ask for summary
2761    let prompt = format!(
2762        "Summarize this agent's work in 2-3 sentences:\n\n\
2763         Subtask: {}\n\
2764         State: {:?}\n\n\
2765         Conversation:\n{}\n\n\
2766         Be concise and focus on what was accomplished or is in progress.",
2767        agent.subtask,
2768        agent.state,
2769        conversation
2770    );
2771
2772    let chat_stream = claudecode.chat(summary_session, prompt, Some(true), None).await;
2773    tokio::pin!(chat_stream);
2774
2775    let mut summary = String::new();
2776    while let Some(event) = chat_stream.next().await {
2777        if let crate::activations::claudecode::ChatEvent::Content { text } = event {
2778            summary.push_str(&text);
2779        }
2780    }
2781
2782    Ok(AgentSummary {
2783        agent_id: agent.agent_id,
2784        subtask: agent.subtask,
2785        state: agent.state,
2786        summary,
2787    })
2788}
2789
2790/// Generate overall meta-summary combining all agent work
2791async fn generate_overall_summary<P: HubContext>(
2792    claudecode: &ClaudeCode<P>,
2793    session_id: &SessionId,
2794    agent_summaries: &[AgentSummary],
2795) -> Option<String> {
2796    use futures::StreamExt;
2797
2798    let summary_session = format!("orcha-meta-summary-{}", Uuid::new_v4());
2799    let meta_summary_session_id = format!("{}-meta-summary-{}", session_id, Uuid::new_v4());
2800
2801    // Create session
2802    let create_stream = claudecode.create(
2803        summary_session.clone(),
2804        "/workspace".to_string(),
2805        crate::activations::claudecode::Model::Haiku,
2806        None,
2807        Some(false),
2808        Some(meta_summary_session_id), // Track meta-summary under parent session
2809    ).await;
2810    tokio::pin!(create_stream);
2811
2812    let mut created = false;
2813    while let Some(result) = create_stream.next().await {
2814        if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
2815            created = true;
2816            break;
2817        }
2818    }
2819
2820    if !created {
2821        return None;
2822    }
2823
2824    // Build prompt with all agent summaries
2825    let mut agent_list = String::new();
2826    for (i, summary) in agent_summaries.iter().enumerate() {
2827        agent_list.push_str(&format!(
2828            "{}. {} ({:?})\n   {}\n\n",
2829            i + 1,
2830            summary.subtask,
2831            summary.state,
2832            summary.summary
2833        ));
2834    }
2835
2836    let prompt = format!(
2837        "This is a multi-agent orchestration session with {} agents working on different subtasks.\n\n\
2838         Agent summaries:\n{}\n\
2839         Provide a 2-4 sentence overall summary of the session's progress and coordination.\n\
2840         Focus on: what's the big picture? What's been accomplished? What's still in progress?",
2841        agent_summaries.len(),
2842        agent_list
2843    );
2844
2845    let chat_stream = claudecode.chat(summary_session, prompt, Some(true), None).await;
2846    tokio::pin!(chat_stream);
2847
2848    let mut summary = String::new();
2849    while let Some(event) = chat_stream.next().await {
2850        if let crate::activations::claudecode::ChatEvent::Content { text } = event {
2851            summary.push_str(&text);
2852        }
2853    }
2854
2855    Some(summary)
2856}