1use super::graph_runner;
2use super::graph_runtime::GraphRuntime;
3use super::orchestrator::run_orchestration_task;
4use super::pm;
5use super::storage::OrchaStorage;
6use super::ticket_compiler;
7use super::types::*;
8use crate::activations::claudecode::{ClaudeCode, Model};
9use crate::activations::claudecode_loopback::ClaudeCodeLoopback;
10use crate::plexus::{Activation, ChildRouter, ChildSummary, HubContext, NoParent, PlexusError, PlexusStream};
11use async_stream::stream;
12use async_trait::async_trait;
13use futures::Stream;
14use futures::StreamExt;
15use plexus_macros::activation;
16use serde_json::Value;
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use tokio::process::Command;
21use uuid::Uuid;
22
23type CancelRegistry = Arc<tokio::sync::Mutex<HashMap<String, tokio::sync::watch::Sender<bool>>>>;
28
29#[derive(Clone)]
33pub struct Orcha<P: HubContext = NoParent> {
34 storage: Arc<OrchaStorage>,
35 claudecode: Arc<ClaudeCode<P>>,
36 loopback: Arc<ClaudeCodeLoopback>,
37 arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
38 graph_runtime: Arc<GraphRuntime>,
39 pm: Arc<pm::Pm>,
40 cancel_registry: CancelRegistry,
42 _phantom: PhantomData<P>,
43}
44
45impl<P: HubContext> Orcha<P> {
46 pub fn new(
48 storage: Arc<OrchaStorage>,
49 claudecode: Arc<ClaudeCode<P>>,
50 loopback: Arc<ClaudeCodeLoopback>,
51 arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
52 graph_runtime: Arc<GraphRuntime>,
53 pm: Arc<pm::Pm>,
54 ) -> Self {
55 Self {
56 storage,
57 claudecode,
58 loopback,
59 arbor_storage,
60 graph_runtime,
61 pm,
62 cancel_registry: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
63 _phantom: PhantomData,
64 }
65 }
66
67 async fn register_cancel(&self, graph_id: &str) -> tokio::sync::watch::Receiver<bool> {
71 let (tx, rx) = tokio::sync::watch::channel(false);
72 self.cancel_registry.lock().await.insert(graph_id.to_string(), tx);
73 rx
74 }
75
76 #[allow(dead_code)]
78 async fn unregister_cancel(&self, graph_id: &str) {
79 self.cancel_registry.lock().await.remove(graph_id);
80 }
81
82 pub fn plugin_children(&self) -> Vec<ChildSummary> {
84 let schema = Activation::plugin_schema(&*self.pm);
85 vec![ChildSummary {
86 namespace: schema.namespace,
87 description: schema.description,
88 hash: schema.hash,
89 }]
90 }
91
92 pub async fn recover_running_graphs(&self)
106 where
107 P: 'static,
108 {
109 use crate::activations::lattice::NodeStatus;
110 use futures::StreamExt;
111
112 let lattice_storage = self.graph_runtime.storage();
113
114 let pm_graph_ids = match self.pm.list_all_graph_ids().await {
116 Ok(ids) => ids,
117 Err(e) => {
118 tracing::warn!("recovery: failed to list PM graph IDs: {}", e);
119 return;
120 }
121 };
122
123 let running_ids = match lattice_storage.get_running_graph_ids().await {
125 Ok(ids) => ids,
126 Err(e) => {
127 tracing::warn!("recovery: failed to query running graphs: {}", e);
128 return;
129 }
130 };
131
132 let pm_set: std::collections::HashSet<String> = pm_graph_ids.into_iter().collect();
134 let to_recover: Vec<String> = running_ids
135 .into_iter()
136 .filter(|id| pm_set.contains(id))
137 .collect();
138
139 if to_recover.is_empty() {
140 tracing::debug!("recovery: no running PM graphs to recover");
141 return;
142 }
143
144 tracing::info!("recovery: recovering {} running graph(s)", to_recover.len());
145
146 for graph_id in to_recover {
147 let storage = lattice_storage.clone();
148 let graph_id_clone = graph_id.clone();
149
150 let nodes = match storage.get_nodes(&graph_id).await {
152 Ok(n) => n,
153 Err(e) => {
154 tracing::warn!("recovery: get_nodes({}) failed: {}", graph_id, e);
155 continue;
156 }
157 };
158
159 for node in &nodes {
160 match node.status {
161 NodeStatus::Running => {
162 tracing::info!(
163 "recovery: re-dispatching interrupted node {} in graph {}",
164 node.id, graph_id
165 );
166 if let Err(e) = storage.reset_running_to_ready(&graph_id, &node.id).await {
167 tracing::warn!(
168 "recovery: reset_running_to_ready failed for node {} in {}: {}",
169 node.id, graph_id, e
170 );
171 }
172 }
173 NodeStatus::Ready => {
174 tracing::info!(
176 "recovery: re-emitting NodeReady for node {} in graph {}",
177 node.id, graph_id
178 );
179 if let Err(e) = storage.reemit_ready_nodes(&graph_id).await {
180 tracing::warn!(
181 "recovery: reemit_ready_nodes({}) failed: {}",
182 graph_id, e
183 );
184 }
185 break;
187 }
188 _ => {} }
190 }
191
192 let graph = Arc::new(self.graph_runtime.open_graph(graph_id_clone.clone()));
194 let cc = self.claudecode.clone();
195 let arbor = self.arbor_storage.clone();
196 let lb = self.loopback.storage();
197 let cancel_registry = self.cancel_registry.clone();
198 let pm_for_recovery = self.pm.clone();
199 let graph_runtime_recovery = self.graph_runtime.clone();
200
201 let graph_meta = lattice_storage.get_graph(&graph_id_clone).await.ok()
203 .and_then(|g| g.metadata.get("_plexus_run_config").cloned());
204
205 let model_enum = graph_meta.as_ref()
206 .and_then(|c| c.get("model"))
207 .and_then(|m| m.as_str())
208 .map(|s| match s {
209 "opus" => crate::activations::claudecode::Model::Opus,
210 "haiku" => crate::activations::claudecode::Model::Haiku,
211 _ => crate::activations::claudecode::Model::Sonnet,
212 })
213 .unwrap_or(crate::activations::claudecode::Model::Sonnet);
214
215 let working_directory = graph_meta.as_ref()
216 .and_then(|c| c.get("working_directory"))
217 .and_then(|w| w.as_str())
218 .unwrap_or("/workspace")
219 .to_string();
220
221 let node_to_ticket: std::collections::HashMap<String, String> = pm_for_recovery
223 .get_ticket_map(&graph_id_clone)
224 .await
225 .unwrap_or_default()
226 .into_iter()
227 .map(|(ticket_id, node_id)| (node_id, ticket_id))
228 .collect();
229
230 tokio::spawn(async move {
231 tracing::info!("recovery: spawning run_graph_execution for {}", graph_id_clone);
232 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
234 cancel_registry.lock().await.insert(graph_id_clone.clone(), cancel_tx);
235
236 let execution = graph_runner::run_graph_execution(
237 graph,
238 cc,
239 arbor,
240 lb,
241 pm_for_recovery,
242 graph_runtime_recovery,
243 cancel_registry.clone(),
244 model_enum,
245 working_directory,
246 cancel_rx,
247 node_to_ticket,
248 );
249 tokio::pin!(execution);
250 while let Some(_event) = execution.next().await {}
251 cancel_registry.lock().await.remove(&graph_id_clone);
252 tracing::info!("recovery: graph {} execution complete", graph_id_clone);
253 });
254 }
255 }
256}
257
258async fn watch_single_graph(
262 gid: String,
263 after_seq: Option<u64>,
264 graph_runtime: Arc<GraphRuntime>,
265 pm: Arc<pm::Pm>,
266 tx: tokio::sync::mpsc::UnboundedSender<OrchaEvent>,
267) {
268 let graph = graph_runtime.open_graph(gid.clone());
269 let node_to_ticket: HashMap<String, String> = pm
270 .get_ticket_map(&gid)
271 .await
272 .unwrap_or_default()
273 .into_iter()
274 .map(|(ticket_id, node_id)| (node_id, ticket_id))
275 .collect();
276
277 let total_nodes = graph.count_nodes().await.unwrap_or(0);
278 let mut complete_nodes: usize = 0;
279
280 fn calc_pct(complete: usize, total: usize) -> Option<u32> {
281 if total == 0 {
282 None
283 } else {
284 Some((complete as f32 / total as f32 * 100.0) as u32)
285 }
286 }
287
288 let event_stream = graph.watch(after_seq);
289 tokio::pin!(event_stream);
290
291 while let Some(crate::activations::lattice::LatticeEventEnvelope { event, .. }) =
292 event_stream.next().await
293 {
294 let evt = match event {
295 crate::activations::lattice::LatticeEvent::NodeReady { node_id, .. } => {
296 let ticket_id = node_to_ticket.get(&node_id).cloned();
297 Some(OrchaEvent::NodeStarted {
298 node_id,
299 label: None,
300 ticket_id,
301 percentage: calc_pct(complete_nodes, total_nodes),
302 })
303 }
304 crate::activations::lattice::LatticeEvent::NodeStarted { .. } => None,
305 crate::activations::lattice::LatticeEvent::NodeDone { node_id, .. } => {
306 complete_nodes += 1;
307 let ticket_id = node_to_ticket.get(&node_id).cloned();
308 Some(OrchaEvent::NodeComplete {
309 node_id,
310 label: None,
311 ticket_id,
312 output_summary: None,
313 percentage: calc_pct(complete_nodes, total_nodes),
314 })
315 }
316 crate::activations::lattice::LatticeEvent::NodeFailed { node_id, error } => {
317 complete_nodes += 1;
318 let ticket_id = node_to_ticket.get(&node_id).cloned();
319 Some(OrchaEvent::NodeFailed {
320 node_id,
321 label: None,
322 ticket_id,
323 error,
324 percentage: calc_pct(complete_nodes, total_nodes),
325 })
326 }
327 crate::activations::lattice::LatticeEvent::GraphDone { graph_id } => {
328 Some(OrchaEvent::Complete { session_id: graph_id })
329 }
330 crate::activations::lattice::LatticeEvent::GraphFailed {
331 graph_id,
332 node_id,
333 error,
334 } => Some(OrchaEvent::Failed {
335 session_id: graph_id,
336 error: format!("Node {} failed: {}", node_id, error),
337 }),
338 };
339 if let Some(e) = evt {
340 if tx.send(e).is_err() {
341 break;
342 }
343 }
344 }
345}
346
347#[async_trait]
348impl<P: HubContext + 'static> ChildRouter for Orcha<P> {
349 fn router_namespace(&self) -> &str {
350 "orcha"
351 }
352
353 async fn router_call(&self, method: &str, params: Value, auth: Option<&plexus_core::plexus::AuthContext>, raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
354 Activation::call(self, method, params, auth, raw_ctx).await
355 }
356
357 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
358 if name == "pm" {
359 Some(Box::new((*self.pm).clone()))
360 } else {
361 None
362 }
363 }
364}
365
366#[plexus_macros::activation(namespace = "orcha",
367version = "1.0.0",
368description = "Full task orchestration with approval loops and validation",
369hub, crate_path = "plexus_core")]
370impl<P: HubContext> Orcha<P> {
371 #[plexus_macros::method]
379 async fn run_task(
380 &self,
381 request: RunTaskRequest,
382 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
383 run_orchestration_task(
384 self.storage.clone(),
385 self.arbor_storage.clone(),
386 self.claudecode.clone(),
387 self.loopback.clone(),
388 request,
389 None, ).await
391 }
392 #[plexus_macros::method]
397 async fn create_session(
398 &self,
399 request: CreateSessionRequest,
400 ) -> impl Stream<Item = CreateSessionResult> + Send + 'static {
401 let storage = self.storage.clone();
402
403 stream! {
404 let session_id = format!("orcha-{}", Uuid::new_v4());
406
407 let agent_mode = if request.multi_agent {
409 AgentMode::Multi
410 } else {
411 AgentMode::Single
412 };
413
414 let session_result = storage.create_session(
416 session_id.clone(),
417 request.model.clone(),
418 request.working_directory.clone(),
419 request.rules.clone(),
420 request.max_retries,
421 agent_mode,
422 None, ).await;
424
425 match session_result {
426 Ok(session) => {
427 yield CreateSessionResult::Ok {
428 session_id,
429 created_at: session.created_at,
430 };
431 }
432 Err(e) => {
433 yield CreateSessionResult::Err {
434 message: format!("Failed to create session: {}", e),
435 };
436 }
437 }
438 }
439 }
440
441 #[plexus_macros::method]
445 async fn update_session_state(
446 &self,
447 session_id: SessionId,
448 state: SessionState,
449 ) -> impl Stream<Item = UpdateSessionStateResult> + Send + 'static {
450 let storage = self.storage.clone();
451
452 stream! {
453 match storage.update_state(&session_id, state).await {
454 Ok(_) => {
455 yield UpdateSessionStateResult::Ok;
456 }
457 Err(e) => {
458 yield UpdateSessionStateResult::Err {
459 message: format!("Failed to update state: {}", e),
460 };
461 }
462 }
463 }
464 }
465
466 #[plexus_macros::method]
468 async fn get_session(
469 &self,
470 request: GetSessionRequest,
471 ) -> impl Stream<Item = GetSessionResult> + Send + 'static {
472 let storage = self.storage.clone();
473
474 stream! {
475 match storage.get_session(&request.session_id).await {
476 Ok(session) => {
477 yield GetSessionResult::Ok { session };
478 }
479 Err(e) => {
480 yield GetSessionResult::Err {
481 message: format!("Session not found: {}", e),
482 };
483 }
484 }
485 }
486 }
487
488 #[plexus_macros::method]
492 async fn extract_validation(
493 &self,
494 text: String,
495 ) -> impl Stream<Item = ExtractValidationResult> + Send + 'static {
496 stream! {
497 match extract_validation_artifact(&text) {
498 Some(artifact) => {
499 yield ExtractValidationResult::Ok { artifact };
500 }
501 None => {
502 yield ExtractValidationResult::NotFound;
503 }
504 }
505 }
506 }
507
508 #[plexus_macros::method]
512 async fn run_validation(
513 &self,
514 artifact: ValidationArtifact,
515 ) -> impl Stream<Item = RunValidationResult> + Send + 'static {
516 stream! {
517 let result = run_validation_test(&artifact).await;
518
519 yield RunValidationResult::Ok { result };
520 }
521 }
522
523 #[plexus_macros::method]
527 async fn increment_retry(
528 &self,
529 session_id: SessionId,
530 ) -> impl Stream<Item = IncrementRetryResult> + Send + 'static {
531 let storage = self.storage.clone();
532
533 stream! {
534 match storage.increment_retry(&session_id).await {
535 Ok(count) => {
536 let max_retries = match storage.get_session(&session_id).await {
537 Ok(s) => s.max_retries,
538 Err(e) => {
539 tracing::warn!("Failed to get session {} for max_retries lookup: {}", session_id, e);
540 3
541 }
542 };
543
544 yield IncrementRetryResult::Ok {
545 retry_count: count,
546 max_retries,
547 exceeded: count >= max_retries,
548 };
549 }
550 Err(e) => {
551 yield IncrementRetryResult::Err {
552 message: format!("Failed to increment retry: {}", e),
553 };
554 }
555 }
556 }
557 }
558
559 #[plexus_macros::method]
561 async fn list_sessions(&self) -> impl Stream<Item = ListSessionsResult> + Send + 'static {
562 let storage = self.storage.clone();
563
564 stream! {
565 let sessions = storage.list_sessions().await;
566 yield ListSessionsResult::Ok { sessions };
567 }
568 }
569
570 #[plexus_macros::method]
572 async fn delete_session(
573 &self,
574 session_id: SessionId,
575 ) -> impl Stream<Item = DeleteSessionResult> + Send + 'static {
576 let storage = self.storage.clone();
577
578 stream! {
579 match storage.delete_session(&session_id).await {
580 Ok(_) => {
581 yield DeleteSessionResult::Ok;
582 }
583 Err(e) => {
584 yield DeleteSessionResult::Err {
585 message: format!("Failed to delete session: {}", e),
586 };
587 }
588 }
589 }
590 }
591
592 #[plexus_macros::method]
598 async fn run_task_async(
599 &self,
600 request: RunTaskRequest,
601 ) -> impl Stream<Item = RunTaskAsyncResult> + Send + 'static {
602 let storage = self.storage.clone();
603 let arbor_storage = self.arbor_storage.clone();
604 let claudecode = self.claudecode.clone();
605 let loopback = self.loopback.clone();
606
607 stream! {
608 let session_id = format!("orcha-{}", Uuid::new_v4());
610 let session_id_for_spawn = session_id.clone();
611
612 let req = request.clone();
614 tokio::spawn(async move {
615 let stream = run_orchestration_task(
616 storage,
617 arbor_storage,
618 claudecode,
619 loopback,
620 req,
621 Some(session_id_for_spawn), ).await;
623 tokio::pin!(stream);
624
625 while let Some(_event) = stream.next().await {
627 }
630 });
631
632 yield RunTaskAsyncResult::Ok { session_id };
634 }
635 }
636
637 #[plexus_macros::method]
641 async fn list_monitor_trees(
642 &self,
643 ) -> impl Stream<Item = ListMonitorTreesResult> + Send + 'static {
644 let arbor_storage = self.arbor_storage.clone();
645
646 stream! {
647 let filter = serde_json::json!({"type": "orcha_monitor"});
649
650 match arbor_storage.tree_query_by_metadata(&filter).await {
651 Ok(tree_ids) => {
652 let mut trees = Vec::new();
653
654 for tree_id in tree_ids {
656 if let Ok(tree) = arbor_storage.tree_get(&tree_id).await {
657 if let Some(metadata) = &tree.metadata {
658 let session_id = metadata.get("session_id")
659 .and_then(|v| v.as_str())
660 .unwrap_or("unknown")
661 .to_string();
662 let tree_path = metadata.get("tree_path")
663 .and_then(|v| v.as_str())
664 .unwrap_or("unknown")
665 .to_string();
666
667 trees.push(MonitorTreeInfo {
668 tree_id: tree.id.to_string(),
669 session_id,
670 tree_path,
671 });
672 }
673 }
674 }
675
676 yield ListMonitorTreesResult::Ok { trees };
677 }
678 Err(_) => {
679 yield ListMonitorTreesResult::Ok { trees: vec![] };
680 }
681 }
682 }
683 }
684
685 #[plexus_macros::method]
690 async fn check_status(
691 &self,
692 request: CheckStatusRequest,
693 ) -> impl Stream<Item = CheckStatusResult> + Send + 'static {
694 let claudecode = self.claudecode.clone();
695 let arbor_storage = self.arbor_storage.clone();
696 let storage = self.storage.clone();
697 let session_id = request.session_id.clone();
698
699 stream! {
700 let session_info = match storage.get_session(&session_id).await {
702 Ok(info) => info,
703 Err(e) => {
704 yield CheckStatusResult::Err {
705 message: format!("Session not found: {}", e),
706 };
707 return;
708 }
709 };
710
711 if session_info.agent_mode == AgentMode::Multi {
713 let agents = match storage.list_agents(&session_id).await {
715 Ok(a) => a,
716 Err(e) => {
717 yield CheckStatusResult::Err {
718 message: format!("Failed to list agents: {}", e),
719 };
720 return;
721 }
722 };
723
724 if agents.is_empty() {
725 yield CheckStatusResult::Err {
726 message: "No agents found in session".to_string(),
727 };
728 return;
729 }
730
731 let summary_futures: Vec<_> = agents.iter().map(|agent| {
733 generate_agent_summary(&claudecode, &arbor_storage, agent.clone())
734 }).collect();
735
736 let agent_summaries: Vec<AgentSummary> = futures::future::join_all(summary_futures)
737 .await
738 .into_iter()
739 .filter_map(|r| match r {
740 Ok(summary) => Some(summary),
741 Err(e) => {
742 tracing::warn!("Failed to generate agent summary: {}", e);
743 None
744 }
745 })
746 .collect();
747
748 let overall_summary = generate_overall_summary(
750 &claudecode,
751 &session_id,
752 &agent_summaries,
753 ).await;
754
755 let summary = overall_summary.unwrap_or_else(|| "Unable to generate summary".to_string());
756
757 match save_status_summary_to_arbor(&arbor_storage, &session_id, &summary).await {
759 Ok(_) => {
760 yield CheckStatusResult::Ok {
761 summary,
762 agent_summaries,
763 };
764 }
765 Err(e) => {
766 tracing::warn!("Failed to save summary to arbor: {}", e);
767 yield CheckStatusResult::Ok {
768 summary,
769 agent_summaries,
770 };
771 }
772 }
773
774 return;
775 }
776
777 let (state_description, stream_id_opt) = match &session_info.state {
782 SessionState::Idle => ("idle (not currently executing)".to_string(), None),
783 SessionState::Running { stream_id, sequence, active_agents, completed_agents, failed_agents } => {
784 let agent_info = if *active_agents > 0 || *completed_agents > 0 || *failed_agents > 0 {
785 format!(" (agents: {} active, {} complete, {} failed)", active_agents, completed_agents, failed_agents)
786 } else {
787 String::new()
788 };
789 (format!("running (stream: {}, sequence: {}{})", stream_id, sequence, agent_info), Some(stream_id.clone()))
790 }
791 SessionState::WaitingApproval { approval_id } => {
792 (format!("waiting for approval (approval_id: {})", approval_id), None)
793 }
794 SessionState::Validating { test_command } => {
795 (format!("validating with command: {}", test_command), None)
796 }
797 SessionState::Complete => ("completed successfully".to_string(), None),
798 SessionState::Failed { error } => {
799 (format!("failed with error: {}", error), None)
800 }
801 };
802
803 let conversation_context = if let Some(stream_id) = stream_id_opt {
805 match claudecode.storage.session_get_by_name(&stream_id).await {
807 Ok(cc_session) => {
808 match arbor_storage.tree_get(&cc_session.head.tree_id).await {
810 Ok(tree) => {
811 let formatted = format_conversation_from_tree(&tree);
812 Some(formatted)
813 }
814 Err(e) => {
815 tracing::warn!("Failed to get arbor tree for claudecode session {}: {}", stream_id, e);
816 None
817 }
818 }
819 }
820 Err(e) => {
821 tracing::warn!("Failed to get claudecode session {}: {}", stream_id, e);
822 None
823 }
824 }
825 } else {
826 None
827 };
828
829 let summary_session = format!("orcha-check-{}", Uuid::new_v4());
831 let summary_session_id = format!("{}-check-{}", session_id, Uuid::new_v4());
832
833 let create_stream = claudecode.create(
835 summary_session.clone(),
836 "/workspace".to_string(), crate::activations::claudecode::Model::Haiku,
838 None,
839 Some(false), Some(summary_session_id), ).await;
842 tokio::pin!(create_stream);
843
844 let mut create_ok = false;
845 while let Some(result) = create_stream.next().await {
846 if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
847 create_ok = true;
848 }
849 }
850
851 if !create_ok {
852 yield CheckStatusResult::Err {
853 message: "Failed to create summary session".to_string(),
854 };
855 return;
856 }
857
858 let prompt = if let Some(conversation) = conversation_context {
860 format!(
861 "An orcha orchestration session has the following status:\n\n\
862 - Session ID: {}\n\
863 - Model: {}\n\
864 - State: {}\n\
865 - Retry count: {}/{}\n\
866 - Created at: {} (unix timestamp)\n\
867 - Last activity: {} (unix timestamp)\n\n\
868 Here is the actual conversation tree showing what the agent has been doing:\n\n\
869 {}\n\n\
870 Generate a brief, natural language summary (2-3 sentences) of what's happening in this session.\n\
871 Focus on what the agent is currently doing or has accomplished. Be specific about the task.",
872 session_id,
873 session_info.model,
874 state_description,
875 session_info.retry_count,
876 session_info.max_retries,
877 session_info.created_at,
878 session_info.last_activity,
879 conversation
880 )
881 } else {
882 format!(
883 "An orcha orchestration session has the following status:\n\n\
884 - Session ID: {}\n\
885 - Model: {}\n\
886 - State: {}\n\
887 - Retry count: {}/{}\n\
888 - Created at: {} (unix timestamp)\n\
889 - Last activity: {} (unix timestamp)\n\n\
890 Generate a brief, natural language summary (2-3 sentences) of what's happening in this session.\n\
891 Focus on the current state and what the agent is doing or has done.",
892 session_id,
893 session_info.model,
894 state_description,
895 session_info.retry_count,
896 session_info.max_retries,
897 session_info.created_at,
898 session_info.last_activity
899 )
900 };
901
902 let chat_stream = claudecode.chat(
903 summary_session.clone(),
904 prompt,
905 Some(true), None,
907 ).await;
908 tokio::pin!(chat_stream);
909
910 let mut summary = String::new();
911 while let Some(event) = chat_stream.next().await {
912 if let crate::activations::claudecode::ChatEvent::Content { text } = event {
913 summary.push_str(&text);
914 }
915 }
916
917 if summary.is_empty() {
918 yield CheckStatusResult::Err {
919 message: "Failed to generate summary".to_string(),
920 };
921 } else {
922 match save_status_summary_to_arbor(&arbor_storage, &session_id, &summary).await {
924 Ok(_) => {
925 yield CheckStatusResult::Ok {
926 summary,
927 agent_summaries: vec![], };
929 }
930 Err(e) => {
931 tracing::warn!("Failed to save summary to arbor: {}", e);
933 yield CheckStatusResult::Ok {
934 summary,
935 agent_summaries: vec![], };
937 }
938 }
939 }
940 }
941 }
942
943 #[plexus_macros::method]
948 async fn spawn_agent(
949 &self,
950 request: SpawnAgentRequest,
951 ) -> impl Stream<Item = SpawnAgentResult> + Send + 'static {
952 let storage = self.storage.clone();
953 let claudecode = self.claudecode.clone();
954 let loopback = self.loopback.clone();
955
956 stream! {
957 let session = match storage.get_session(&request.session_id).await {
959 Ok(s) => s,
960 Err(e) => {
961 yield SpawnAgentResult::Err {
962 message: format!("Session not found: {}", e),
963 };
964 return;
965 }
966 };
967
968 if session.agent_mode != AgentMode::Multi {
969 yield SpawnAgentResult::Err {
970 message: "Session is not in multi-agent mode".to_string(),
971 };
972 return;
973 }
974
975 let model = match session.model.as_str() {
977 "opus" => crate::activations::claudecode::Model::Opus,
978 "sonnet" => crate::activations::claudecode::Model::Sonnet,
979 "haiku" => crate::activations::claudecode::Model::Haiku,
980 _ => crate::activations::claudecode::Model::Sonnet,
981 };
982
983 let cc_session_name = format!("orcha-agent-{}", Uuid::new_v4());
985 let agent_session_id = format!("{}-agent-{}", session.session_id, Uuid::new_v4());
986
987 let create_stream = claudecode.create(
988 cc_session_name.clone(),
989 "/workspace".to_string(), model.clone(),
991 None,
992 Some(true), Some(agent_session_id), ).await;
995 tokio::pin!(create_stream);
996
997 let mut create_ok = false;
998 while let Some(result) = create_stream.next().await {
999 if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
1000 create_ok = true;
1001 break;
1002 }
1003 }
1004
1005 if !create_ok {
1006 yield SpawnAgentResult::Err {
1007 message: "Failed to create ClaudeCode session".to_string(),
1008 };
1009 return;
1010 }
1011
1012 match storage.create_agent(
1014 &request.session_id,
1015 cc_session_name.clone(),
1016 request.subtask.clone(),
1017 false, request.parent_agent_id,
1019 ).await {
1020 Ok(agent) => {
1021 let config = super::orchestrator::AgentConfig {
1023 model,
1024 working_directory: "/workspace".to_string(),
1025 max_retries: session.max_retries,
1026 task_context: request.subtask.clone(),
1027 auto_approve: true, };
1029
1030 super::orchestrator::spawn_agent_task(
1031 storage.clone(),
1032 claudecode.clone(),
1033 loopback.clone(),
1034 agent.clone(),
1035 request.subtask.clone(),
1036 config,
1037 );
1038
1039 yield SpawnAgentResult::Ok {
1040 agent_id: agent.agent_id,
1041 claudecode_session_id: cc_session_name,
1042 };
1043 }
1044 Err(e) => {
1045 yield SpawnAgentResult::Err {
1046 message: format!("Failed to create agent: {}", e),
1047 };
1048 }
1049 }
1050 }
1051 }
1052
1053 #[plexus_macros::method]
1055 async fn list_agents(
1056 &self,
1057 request: ListAgentsRequest,
1058 ) -> impl Stream<Item = ListAgentsResult> + Send + 'static {
1059 let storage = self.storage.clone();
1060
1061 stream! {
1062 match storage.list_agents(&request.session_id).await {
1063 Ok(agents) => {
1064 yield ListAgentsResult::Ok { agents };
1065 }
1066 Err(e) => {
1067 yield ListAgentsResult::Err {
1068 message: format!("Failed to list agents: {}", e),
1069 };
1070 }
1071 }
1072 }
1073 }
1074
1075 #[plexus_macros::method]
1077 async fn get_agent(
1078 &self,
1079 request: GetAgentRequest,
1080 ) -> impl Stream<Item = GetAgentResult> + Send + 'static {
1081 let storage = self.storage.clone();
1082
1083 stream! {
1084 match storage.get_agent(&request.agent_id).await {
1085 Ok(agent) => {
1086 yield GetAgentResult::Ok { agent };
1087 }
1088 Err(e) => {
1089 yield GetAgentResult::Err {
1090 message: format!("Agent not found: {}", e),
1091 };
1092 }
1093 }
1094 }
1095 }
1096
1097 #[plexus_macros::method]
1102 async fn list_pending_approvals(
1103 &self,
1104 request: ListApprovalsRequest,
1105 ) -> impl Stream<Item = ListApprovalsResult> + Send + 'static {
1106 let loopback = self.loopback.clone();
1107 let session_id = request.session_id;
1108
1109 stream! {
1110 match loopback.storage().list_pending(Some(&session_id)).await {
1111 Ok(approvals) => {
1112 let approval_infos: Vec<ApprovalInfo> = approvals
1113 .into_iter()
1114 .map(|approval| ApprovalInfo {
1115 approval_id: approval.id.to_string(),
1116 session_id: approval.session_id,
1117 tool_name: approval.tool_name,
1118 tool_use_id: approval.tool_use_id,
1119 tool_input: approval.input,
1120 created_at: chrono::DateTime::from_timestamp(approval.created_at, 0)
1121 .map(|dt| dt.to_rfc3339())
1122 .unwrap_or_else(|| approval.created_at.to_string()),
1123 })
1124 .collect();
1125
1126 yield ListApprovalsResult::Ok {
1127 approvals: approval_infos,
1128 };
1129 }
1130 Err(e) => {
1131 yield ListApprovalsResult::Err {
1132 message: format!("Failed to list pending approvals: {}", e),
1133 };
1134 }
1135 }
1136 }
1137 }
1138
1139 #[plexus_macros::method]
1144 async fn approve_request(
1145 &self,
1146 request: ApproveRequest,
1147 ) -> impl Stream<Item = ApprovalActionResult> + Send + 'static {
1148 let loopback = self.loopback.clone();
1149 let approval_id = request.approval_id.clone();
1150 let message = request.message.clone();
1151
1152 stream! {
1153 match uuid::Uuid::parse_str(&approval_id) {
1154 Ok(uuid_id) => {
1155 match loopback.storage()
1156 .resolve_approval(&uuid_id, true, message.clone())
1157 .await
1158 {
1159 Ok(_) => {
1160 yield ApprovalActionResult::Ok {
1161 approval_id: approval_id.clone(),
1162 message: Some("Approved".to_string()),
1163 };
1164 }
1165 Err(e) => {
1166 yield ApprovalActionResult::Err {
1167 message: format!("Failed to approve: {}", e),
1168 };
1169 }
1170 }
1171 }
1172 Err(_) => {
1173 yield ApprovalActionResult::Err {
1174 message: format!("Invalid approval_id format: {}", approval_id),
1175 };
1176 }
1177 }
1178 }
1179 }
1180
1181 #[plexus_macros::method]
1186 async fn deny_request(
1187 &self,
1188 request: DenyRequest,
1189 ) -> impl Stream<Item = ApprovalActionResult> + Send + 'static {
1190 let loopback = self.loopback.clone();
1191 let approval_id = request.approval_id.clone();
1192 let reason = request.reason.clone();
1193
1194 stream! {
1195 match uuid::Uuid::parse_str(&approval_id) {
1196 Ok(uuid_id) => {
1197 match loopback.storage()
1198 .resolve_approval(&uuid_id, false, reason.clone())
1199 .await
1200 {
1201 Ok(_) => {
1202 yield ApprovalActionResult::Ok {
1203 approval_id: approval_id.clone(),
1204 message: reason.or(Some("Denied".to_string())),
1205 };
1206 }
1207 Err(e) => {
1208 yield ApprovalActionResult::Err {
1209 message: format!("Failed to deny: {}", e),
1210 };
1211 }
1212 }
1213 }
1214 Err(_) => {
1215 yield ApprovalActionResult::Err {
1216 message: format!("Invalid approval_id format: {}", approval_id),
1217 };
1218 }
1219 }
1220 }
1221 }
1222
1223 #[plexus_macros::method(params(
1233 graph_id = "Graph ID to watch for approval requests",
1234 timeout_secs = "How long to wait before closing (default: 300)"
1235 ))]
1236 async fn subscribe_approvals(
1237 &self,
1238 graph_id: String,
1239 timeout_secs: Option<u64>,
1240 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1241 let loopback_storage = self.loopback.storage();
1242 let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(300));
1243
1244 stream! {
1245 let notifier = loopback_storage.get_or_create_notifier(&graph_id);
1246 let deadline = std::time::Instant::now() + timeout;
1247 let mut seen_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1248
1249 loop {
1250 match loopback_storage.list_pending(Some(&graph_id)).await {
1252 Ok(approvals) => {
1253 for approval in approvals {
1254 let id_str = approval.id.to_string();
1255 if seen_ids.contains(&id_str) {
1256 continue;
1257 }
1258 seen_ids.insert(id_str);
1259 let created_at = chrono::DateTime::from_timestamp(approval.created_at, 0)
1260 .map(|dt| dt.to_rfc3339())
1261 .unwrap_or_else(|| approval.created_at.to_string());
1262 yield OrchaEvent::ApprovalPending {
1263 approval_id: approval.id.to_string(),
1264 graph_id: graph_id.clone(),
1265 tool_name: approval.tool_name,
1266 tool_input: approval.input,
1267 created_at,
1268 };
1269 }
1270 }
1271 Err(e) => {
1272 tracing::warn!("subscribe_approvals: failed to list pending: {}", e);
1273 }
1274 }
1275
1276 let remaining = match deadline.checked_duration_since(std::time::Instant::now()) {
1278 Some(d) => d,
1279 None => break, };
1281
1282 tokio::select! {
1283 _ = notifier.notified() => {
1284 continue;
1286 }
1287 _ = tokio::time::sleep(remaining) => {
1288 break;
1290 }
1291 }
1292 }
1293 }
1294 }
1295
1296 #[plexus_macros::method(params(
1305 graph_id = "ID of the lattice graph to execute",
1306 model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
1307 working_directory = "Working directory for task nodes (default: /workspace)"
1308 ))]
1309 async fn run_graph(
1310 &self,
1311 graph_id: String,
1312 model: Option<String>,
1313 working_directory: Option<String>,
1314 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1315 let model_enum = match model.as_deref().unwrap_or("sonnet") {
1316 "opus" => Model::Opus,
1317 "haiku" => Model::Haiku,
1318 _ => Model::Sonnet,
1319 };
1320 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1321
1322 let cancel_rx = self.register_cancel(&graph_id).await;
1323 let cancel_registry = self.cancel_registry.clone();
1324 let graph = Arc::new(self.graph_runtime.open_graph(graph_id.clone()));
1325 let claudecode = self.claudecode.clone();
1326 let arbor_storage = self.arbor_storage.clone();
1327 let loopback_storage = self.loopback.storage();
1328 let pm = self.pm.clone();
1329 let graph_runtime = self.graph_runtime.clone();
1330 stream! {
1331 let execution = graph_runner::run_graph_execution(
1332 graph,
1333 claudecode,
1334 arbor_storage,
1335 loopback_storage,
1336 pm,
1337 graph_runtime,
1338 cancel_registry.clone(),
1339 model_enum,
1340 wd,
1341 cancel_rx,
1342 std::collections::HashMap::new(),
1343 );
1344 tokio::pin!(execution);
1345 while let Some(event) = execution.next().await {
1346 yield event;
1347 }
1348 cancel_registry.lock().await.remove(&graph_id);
1349 }
1350 }
1351
1352 #[plexus_macros::method(params(
1361 task = "Natural-language task — passed directly to Claude as the planning prompt",
1362 model = "Model for all nodes: opus, sonnet, haiku (default: sonnet)",
1363 working_directory = "Working directory for task nodes (default: /workspace)"
1364 ))]
1365 async fn run_plan(
1366 &self,
1367 task: String,
1368 model: Option<String>,
1369 working_directory: Option<String>,
1370 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1371 let model_str = model.as_deref().unwrap_or("sonnet").to_string();
1372 let model_enum = match model_str.as_str() {
1373 "opus" => Model::Opus,
1374 "haiku" => Model::Haiku,
1375 _ => Model::Sonnet,
1376 };
1377 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1378 let graph_runtime = self.graph_runtime.clone();
1379 let cancel_registry = self.cancel_registry.clone();
1380 let claudecode = self.claudecode.clone();
1381 let arbor = self.arbor_storage.clone();
1382 let lb = self.loopback.storage();
1383 let pm = self.pm.clone();
1384
1385 stream! {
1386 let metadata = serde_json::json!({
1387 "_plexus_run_config": {
1388 "model": model_str,
1389 "working_directory": wd,
1390 }
1391 });
1392 let graph = match graph_runtime.create_graph(metadata).await {
1393 Ok(g) => Arc::new(g),
1394 Err(e) => {
1395 yield OrchaEvent::Failed { session_id: String::new(), error: e };
1396 return;
1397 }
1398 };
1399 let graph_id = graph.graph_id.clone();
1400
1401 let node_id = match graph.add_plan(task.clone()).await {
1402 Ok(id) => id,
1403 Err(e) => {
1404 yield OrchaEvent::Failed { session_id: graph_id, error: e };
1405 return;
1406 }
1407 };
1408
1409 let ticket_map: std::collections::HashMap<String, String> =
1410 [("plan".to_string(), node_id.clone())].into_iter().collect();
1411 let _ = pm.save_ticket_map(&graph_id, &ticket_map).await;
1412 let _ = pm.save_ticket_source(&graph_id, &task).await;
1413
1414 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1415 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
1416
1417 let node_to_ticket: std::collections::HashMap<String, String> =
1418 [(node_id, "plan".to_string())].into_iter().collect();
1419 let execution = graph_runner::run_graph_execution(
1420 graph, claudecode, arbor, lb, pm,
1421 graph_runtime, cancel_registry.clone(),
1422 model_enum, wd, cancel_rx, node_to_ticket,
1423 );
1424 tokio::pin!(execution);
1425 while let Some(event) = execution.next().await {
1426 yield event;
1427 }
1428 cancel_registry.lock().await.remove(&graph_id);
1429 }
1430 }
1431
1432 #[plexus_macros::method(params(
1441 graph_id = "Lattice graph ID to cancel"
1442 ))]
1443 async fn cancel_graph(
1444 &self,
1445 graph_id: String,
1446 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1447 let cancel_registry = self.cancel_registry.clone();
1448 let lattice_storage = self.graph_runtime.storage();
1449 stream! {
1450 let mut all_graph_ids: Vec<String> = Vec::new();
1452 let mut to_visit: std::collections::VecDeque<String> = std::collections::VecDeque::new();
1453 to_visit.push_back(graph_id.clone());
1454 while let Some(gid) = to_visit.pop_front() {
1455 all_graph_ids.push(gid.clone());
1456 if let Ok(children) = lattice_storage.get_child_graphs(&gid).await {
1457 for child in children {
1458 to_visit.push_back(child.id);
1459 }
1460 }
1461 }
1462
1463 let mut registry = cancel_registry.lock().await;
1465 let root_cancelled = registry.contains_key(&graph_id);
1466 for gid in all_graph_ids {
1467 if let Some(cancel_tx) = registry.remove(&gid) {
1468 let _ = cancel_tx.send(true);
1469 }
1470 }
1471
1472 if root_cancelled {
1473 yield OrchaEvent::Cancelled { graph_id };
1474 } else {
1475 yield OrchaEvent::Failed {
1476 session_id: graph_id,
1477 error: "Graph not found in cancel registry (not running or already finished)".to_string(),
1478 };
1479 }
1480 }
1481 }
1482
1483 #[plexus_macros::method(params(
1502 graph_id = "Lattice graph ID from run_tickets_async or build_tickets",
1503 after_seq = "Sequence number to resume from (0 or omit to start from beginning)"
1504 ))]
1505 async fn subscribe_graph(
1506 &self,
1507 graph_id: String,
1508 after_seq: Option<u64>,
1509 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1510 let graph = self.graph_runtime.open_graph(graph_id.clone());
1511 let pm = self.pm.clone();
1512 stream! {
1513 let node_to_ticket: HashMap<String, String> = pm.get_ticket_map(&graph_id).await
1515 .unwrap_or_default()
1516 .into_iter()
1517 .map(|(ticket_id, node_id)| (node_id, ticket_id))
1518 .collect();
1519
1520 let event_stream = graph.watch(after_seq);
1521 tokio::pin!(event_stream);
1522
1523 let total_nodes: usize = graph.count_nodes().await.unwrap_or(0);
1525 let mut complete_nodes: usize = 0;
1526
1527 fn calc_percentage(complete: usize, total: usize) -> Option<u32> {
1528 if total == 0 { return None; }
1529 Some((complete as f32 / total as f32 * 100.0) as u32)
1530 }
1531
1532 while let Some(crate::activations::lattice::LatticeEventEnvelope { event, .. }) = event_stream.next().await {
1533 match event {
1534 crate::activations::lattice::LatticeEvent::NodeReady { node_id, .. } => {
1535 let ticket_id = node_to_ticket.get(&node_id).cloned();
1536 yield OrchaEvent::NodeStarted {
1537 node_id,
1538 label: None,
1539 ticket_id,
1540 percentage: calc_percentage(complete_nodes, total_nodes),
1541 };
1542 }
1543 crate::activations::lattice::LatticeEvent::NodeStarted { .. } => {
1544 }
1546 crate::activations::lattice::LatticeEvent::NodeDone { node_id, .. } => {
1547 complete_nodes += 1;
1548 let ticket_id = node_to_ticket.get(&node_id).cloned();
1549 yield OrchaEvent::NodeComplete {
1550 node_id,
1551 label: None,
1552 ticket_id,
1553 output_summary: None,
1554 percentage: calc_percentage(complete_nodes, total_nodes),
1555 };
1556 }
1557 crate::activations::lattice::LatticeEvent::NodeFailed { node_id, error } => {
1558 complete_nodes += 1;
1559 let ticket_id = node_to_ticket.get(&node_id).cloned();
1560 yield OrchaEvent::NodeFailed {
1561 node_id,
1562 label: None,
1563 ticket_id,
1564 error,
1565 percentage: calc_percentage(complete_nodes, total_nodes),
1566 };
1567 }
1568 crate::activations::lattice::LatticeEvent::GraphDone { graph_id } => {
1569 yield OrchaEvent::Complete { session_id: graph_id };
1570 return;
1571 }
1572 crate::activations::lattice::LatticeEvent::GraphFailed { graph_id, node_id, error } => {
1573 yield OrchaEvent::Failed {
1574 session_id: graph_id,
1575 error: format!("Node {} failed: {}", node_id, error),
1576 };
1577 return;
1578 }
1579 }
1580 }
1581 }
1582 }
1583
1584 #[plexus_macros::method(params(
1594 graph_id = "Root graph ID to watch (recursively includes all child graphs)",
1595 after_seq = "Sequence number for the root graph to resume from (0 or omit)"
1596 ))]
1597 async fn watch_graph_tree(
1598 &self,
1599 graph_id: String,
1600 after_seq: Option<u64>,
1601 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1602 let graph_runtime = self.graph_runtime.clone();
1603 let pm = self.pm.clone();
1604 let root_id = graph_id.clone();
1605 stream! {
1606 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<OrchaEvent>();
1607 let known_ids: Arc<tokio::sync::Mutex<std::collections::HashSet<String>>> =
1608 Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new()));
1609
1610 known_ids.lock().await.insert(root_id.clone());
1612 {
1613 let gr = graph_runtime.clone();
1614 let pm_w = pm.clone();
1615 let tx_w = tx.clone();
1616 let rid = root_id.clone();
1617 tokio::spawn(async move {
1618 watch_single_graph(rid, after_seq, gr, pm_w, tx_w).await;
1619 });
1620 }
1621
1622 {
1624 let lattice_storage = graph_runtime.storage();
1625 let known = known_ids.clone();
1626 let tx_disc = tx.clone();
1627 let gr_disc = graph_runtime.clone();
1628 let pm_disc = pm.clone();
1629 tokio::spawn(async move {
1630 loop {
1631 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1632 if tx_disc.is_closed() {
1633 break;
1634 }
1635 let current_known: Vec<String> =
1636 known.lock().await.iter().cloned().collect();
1637 for gid in current_known {
1638 if let Ok(children) = lattice_storage.get_child_graphs(&gid).await {
1639 for child in children {
1640 let mut guard = known.lock().await;
1641 if !guard.contains(&child.id) {
1642 guard.insert(child.id.clone());
1643 drop(guard);
1644 let gr = gr_disc.clone();
1645 let pm_c = pm_disc.clone();
1646 let tx_c = tx_disc.clone();
1647 let cid = child.id;
1648 tokio::spawn(async move {
1649 watch_single_graph(cid, None, gr, pm_c, tx_c).await;
1650 });
1651 }
1652 }
1653 }
1654 }
1655 }
1656 });
1657 }
1658
1659 while let Some(event) = rx.recv().await {
1661 let is_root_terminal = matches!(&event,
1662 OrchaEvent::Complete { session_id } | OrchaEvent::Failed { session_id, .. }
1663 if session_id == &root_id
1664 );
1665 yield event;
1666 if is_root_terminal {
1667 break;
1668 }
1669 }
1670 }
1671 }
1672
1673 #[plexus_macros::method(params(
1677 metadata = "Arbitrary JSON metadata attached to the graph"
1678 ))]
1679 async fn create_graph(
1680 &self,
1681 metadata: Value,
1682 ) -> impl Stream<Item = OrchaCreateGraphResult> + Send + 'static {
1683 let graph_runtime = self.graph_runtime.clone();
1684 stream! {
1685 match graph_runtime.create_graph(metadata).await {
1686 Ok(graph) => yield OrchaCreateGraphResult::Ok { graph_id: graph.graph_id },
1687 Err(e) => yield OrchaCreateGraphResult::Err { message: e },
1688 }
1689 }
1690 }
1691
1692 #[plexus_macros::method(params(
1694 graph_id = "Graph to add the node to",
1695 task = "Prompt for Claude to execute"
1696 ))]
1697 async fn add_task_node(
1698 &self,
1699 graph_id: String,
1700 task: String,
1701 ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1702 let graph = self.graph_runtime.open_graph(graph_id);
1703 stream! {
1704 match graph.add_task(task, None).await {
1705 Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1706 Err(e) => yield OrchaAddNodeResult::Err { message: e },
1707 }
1708 }
1709 }
1710
1711 #[plexus_macros::method(params(
1713 graph_id = "Graph to add the node to",
1714 task = "Synthesis prompt for Claude"
1715 ))]
1716 async fn add_synthesize_node(
1717 &self,
1718 graph_id: String,
1719 task: String,
1720 ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1721 let graph = self.graph_runtime.open_graph(graph_id);
1722 stream! {
1723 match graph.add_synthesize(task, None).await {
1724 Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1725 Err(e) => yield OrchaAddNodeResult::Err { message: e },
1726 }
1727 }
1728 }
1729
1730 #[plexus_macros::method(params(
1732 graph_id = "Graph to add the node to",
1733 command = "Shell command to validate (exit 0 = pass)",
1734 cwd = "Working directory (default: /workspace)"
1735 ))]
1736 async fn add_validate_node(
1737 &self,
1738 graph_id: String,
1739 command: String,
1740 cwd: Option<String>,
1741 ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1742 let graph = self.graph_runtime.open_graph(graph_id);
1743 stream! {
1744 match graph.add_validate(command, cwd, None).await {
1745 Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1746 Err(e) => yield OrchaAddNodeResult::Err { message: e },
1747 }
1748 }
1749 }
1750
1751 #[plexus_macros::method(params(
1753 graph_id = "Graph to add the node to",
1754 strategy = "Gather strategy: {\"type\":\"all\"} or {\"type\":\"first\",\"n\":N}"
1755 ))]
1756 async fn add_gather_node(
1757 &self,
1758 graph_id: String,
1759 strategy: GatherStrategy,
1760 ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1761 let graph = self.graph_runtime.open_graph(graph_id);
1762 stream! {
1763 match graph.add_gather(strategy).await {
1764 Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1765 Err(e) => yield OrchaAddNodeResult::Err { message: e },
1766 }
1767 }
1768 }
1769
1770 #[plexus_macros::method(params(
1775 graph_id = "Graph to add the node to",
1776 child_graph_id = "ID of the graph to run as a sub-graph"
1777 ))]
1778 async fn add_subgraph_node(
1779 &self,
1780 graph_id: String,
1781 child_graph_id: String,
1782 ) -> impl Stream<Item = OrchaAddNodeResult> + Send + 'static {
1783 let graph = self.graph_runtime.open_graph(graph_id);
1784 stream! {
1785 match graph.add_subgraph(child_graph_id).await {
1786 Ok(node_id) => yield OrchaAddNodeResult::Ok { node_id },
1787 Err(e) => yield OrchaAddNodeResult::Err { message: e },
1788 }
1789 }
1790 }
1791
1792 #[plexus_macros::method(params(
1794 graph_id = "Graph containing both nodes",
1795 dependent_node_id = "Node that must wait",
1796 dependency_node_id = "Node that must complete first"
1797 ))]
1798 async fn add_dependency(
1799 &self,
1800 graph_id: String,
1801 dependent_node_id: String,
1802 dependency_node_id: String,
1803 ) -> impl Stream<Item = OrchaAddDependencyResult> + Send + 'static {
1804 let graph = self.graph_runtime.open_graph(graph_id);
1805 stream! {
1806 match graph.depends_on(&dependent_node_id, &dependency_node_id).await {
1807 Ok(()) => yield OrchaAddDependencyResult::Ok,
1808 Err(e) => yield OrchaAddDependencyResult::Err { message: e },
1809 }
1810 }
1811 }
1812
1813 #[plexus_macros::method(params(
1818 tickets = "Raw ticket file content",
1819 metadata = "Arbitrary JSON metadata attached to the graph"
1820 ))]
1821 async fn build_tickets(
1822 &self,
1823 tickets: String,
1824 metadata: Value,
1825 ) -> impl Stream<Item = OrchaCreateGraphResult> + Send + 'static {
1826 let graph_runtime = self.graph_runtime.clone();
1827 let pm = self.pm.clone();
1828 stream! {
1829 let compiled = match ticket_compiler::compile_tickets(&tickets) {
1830 Ok(c) => c,
1831 Err(e) => {
1832 yield OrchaCreateGraphResult::Err {
1833 message: format!("Ticket compile error: {}", e),
1834 };
1835 return;
1836 }
1837 };
1838 match build_graph_from_definition(
1839 graph_runtime, metadata, compiled.nodes, compiled.edges,
1840 ).await {
1841 Ok((graph_id, id_map)) => {
1842 let _ = pm.save_ticket_map(&graph_id, &id_map).await;
1843 yield OrchaCreateGraphResult::Ok { graph_id };
1844 }
1845 Err(e) => yield OrchaCreateGraphResult::Err { message: e },
1846 }
1847 }
1848 }
1849
1850 #[plexus_macros::method(params(
1864 tickets = "Raw ticket file content",
1865 metadata = "Arbitrary JSON metadata attached to the graph",
1866 model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
1867 working_directory = "Working directory for task nodes (default: /workspace)"
1868 ))]
1869 async fn run_tickets(
1870 &self,
1871 tickets: String,
1872 metadata: Value,
1873 model: Option<String>,
1874 working_directory: Option<String>,
1875 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
1876 let graph_runtime = self.graph_runtime.clone();
1877 let claudecode = self.claudecode.clone();
1878 let arbor_storage = self.arbor_storage.clone();
1879 let loopback_storage = self.loopback.storage();
1880 let pm = self.pm.clone();
1881 let cancel_registry = self.cancel_registry.clone();
1882 stream! {
1883 let compiled = match ticket_compiler::compile_tickets(&tickets) {
1884 Ok(c) => c,
1885 Err(e) => {
1886 yield OrchaEvent::Failed {
1887 session_id: "tickets".to_string(),
1888 error: format!("Ticket compile error: {}", e),
1889 };
1890 return;
1891 }
1892 };
1893 let model_str = model.as_deref().unwrap_or("sonnet").to_string();
1894 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
1895 let mut enriched_metadata = if metadata.is_object() {
1896 metadata.clone()
1897 } else {
1898 serde_json::json!({})
1899 };
1900 enriched_metadata["_plexus_run_config"] = serde_json::json!({
1901 "model": model_str,
1902 "working_directory": wd,
1903 });
1904 let (graph_id, id_map) = match build_graph_from_definition(
1905 graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
1906 ).await {
1907 Ok(pair) => pair,
1908 Err(e) => {
1909 yield OrchaEvent::Failed {
1910 session_id: "tickets".to_string(),
1911 error: e,
1912 };
1913 return;
1914 }
1915 };
1916 let _ = pm.save_ticket_map(&graph_id, &id_map).await;
1917 let _ = pm.save_ticket_source(&graph_id, &tickets).await;
1918
1919 yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
1920
1921 let model_enum = match model_str.as_str() {
1922 "opus" => Model::Opus,
1923 "haiku" => Model::Haiku,
1924 _ => Model::Sonnet,
1925 };
1926
1927 if !std::path::Path::new(&wd).is_dir() {
1931 yield OrchaEvent::Failed {
1932 session_id: "tickets".to_string(),
1933 error: format!(
1934 "Working directory does not exist: '{}'. \
1935 Create it before running tickets or pass an existing path.",
1936 wd
1937 ),
1938 };
1939 return;
1940 }
1941
1942 let node_to_ticket: std::collections::HashMap<String, String> = id_map
1944 .iter()
1945 .map(|(ticket, node)| (node.clone(), ticket.clone()))
1946 .collect();
1947
1948 let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
1949
1950 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1952 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
1953
1954 tokio::spawn(async move {
1957 let execution = graph_runner::run_graph_execution(
1958 graph,
1959 claudecode,
1960 arbor_storage,
1961 loopback_storage,
1962 pm,
1963 graph_runtime,
1964 cancel_registry.clone(),
1965 model_enum,
1966 wd,
1967 cancel_rx,
1968 node_to_ticket,
1969 );
1970 tokio::pin!(execution);
1971 while let Some(event) = execution.next().await {
1972 match &event {
1973 OrchaEvent::Failed { error, .. } => {
1974 tracing::error!("run_tickets graph {} failed: {}", graph_id, error);
1975 }
1976 OrchaEvent::Complete { .. } => {
1977 tracing::info!("run_tickets graph {} complete", graph_id);
1978 }
1979 _ => {}
1980 }
1981 }
1982 cancel_registry.lock().await.remove(&graph_id);
1983 });
1984 }
1985 }
1986
1987 #[plexus_macros::method(params(
1997 tickets = "Raw ticket file content",
1998 metadata = "Arbitrary JSON metadata",
1999 model = "Model: opus, sonnet, haiku (default: sonnet)",
2000 working_directory = "Working directory (default: /workspace)"
2001 ))]
2002 async fn run_tickets_async(
2003 &self,
2004 tickets: String,
2005 metadata: Value,
2006 model: Option<String>,
2007 working_directory: Option<String>,
2008 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2009 let graph_runtime = self.graph_runtime.clone();
2010 let claudecode = self.claudecode.clone();
2011 let arbor_storage = self.arbor_storage.clone();
2012 let loopback_storage = self.loopback.storage();
2013 let pm = self.pm.clone();
2014 let cancel_registry = self.cancel_registry.clone();
2015 stream! {
2016 let compiled = match ticket_compiler::compile_tickets(&tickets) {
2017 Ok(c) => c,
2018 Err(e) => {
2019 yield OrchaEvent::Failed {
2020 session_id: "tickets".to_string(),
2021 error: format!("Ticket compile error: {}", e),
2022 };
2023 return;
2024 }
2025 };
2026
2027 let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2028 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2029
2030 if !std::path::Path::new(&wd).is_dir() {
2033 yield OrchaEvent::Failed {
2034 session_id: "tickets".to_string(),
2035 error: format!(
2036 "Working directory does not exist: '{}'. \
2037 Create it before running tickets or pass an existing path.",
2038 wd
2039 ),
2040 };
2041 return;
2042 }
2043
2044 let mut enriched_metadata = if metadata.is_object() {
2045 metadata.clone()
2046 } else {
2047 serde_json::json!({})
2048 };
2049 enriched_metadata["_plexus_run_config"] = serde_json::json!({
2050 "model": model_str,
2051 "working_directory": wd,
2052 });
2053
2054 let (graph_id, id_map) = match build_graph_from_definition(
2055 graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2056 ).await {
2057 Ok(pair) => pair,
2058 Err(e) => {
2059 yield OrchaEvent::Failed {
2060 session_id: "tickets".to_string(),
2061 error: e,
2062 };
2063 return;
2064 }
2065 };
2066
2067 let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2068 let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2069
2070 yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2071
2072 let model_enum = match model_str.as_str() {
2073 "opus" => Model::Opus,
2074 "haiku" => Model::Haiku,
2075 _ => Model::Sonnet,
2076 };
2077
2078 let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2079
2080 let node_to_ticket: std::collections::HashMap<String, String> = id_map
2082 .iter()
2083 .map(|(ticket, node)| (node.clone(), ticket.clone()))
2084 .collect();
2085
2086 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2088 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2089
2090 tokio::spawn(async move {
2091 let execution = graph_runner::run_graph_execution(
2092 graph,
2093 claudecode,
2094 arbor_storage,
2095 loopback_storage,
2096 pm,
2097 graph_runtime,
2098 cancel_registry.clone(),
2099 model_enum,
2100 wd,
2101 cancel_rx,
2102 node_to_ticket,
2103 );
2104 tokio::pin!(execution);
2105 while let Some(event) = execution.next().await {
2106 match &event {
2107 OrchaEvent::Failed { error, .. } => {
2108 tracing::error!(
2109 "run_tickets_async graph {} failed: {}",
2110 graph_id, error
2111 );
2112 }
2113 OrchaEvent::Complete { .. } => {
2114 tracing::info!("run_tickets_async graph {} complete", graph_id);
2115 }
2116 _ => {}
2117 }
2118 }
2119 cancel_registry.lock().await.remove(&graph_id);
2120 });
2121 }
2122 }
2123
2124 #[plexus_macros::method(params(
2132 paths = "Absolute paths to ticket markdown files, e.g. [\"/workspace/plans/batch.tickets.md\"]",
2133 metadata = "Arbitrary JSON metadata attached to the graph",
2134 model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2135 working_directory = "Working directory for task nodes (default: /workspace)"
2136 ))]
2137 async fn run_tickets_files(
2138 &self,
2139 paths: Vec<String>,
2140 metadata: Value,
2141 model: Option<String>,
2142 working_directory: Option<String>,
2143 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2144 let graph_runtime = self.graph_runtime.clone();
2145 let claudecode = self.claudecode.clone();
2146 let arbor_storage = self.arbor_storage.clone();
2147 let loopback_storage = self.loopback.storage();
2148 let pm = self.pm.clone();
2149 let cancel_registry = self.cancel_registry.clone();
2150 stream! {
2151 let mut parts: Vec<String> = Vec::new();
2153 for path in &paths {
2154 match tokio::fs::read_to_string(path).await {
2155 Ok(content) => parts.push(content),
2156 Err(e) => {
2157 yield OrchaEvent::Failed {
2158 session_id: "tickets".to_string(),
2159 error: format!("Failed to read '{}': {}", path, e),
2160 };
2161 return;
2162 }
2163 }
2164 }
2165 let tickets = parts.join("\n\n");
2166
2167 let compiled = match ticket_compiler::compile_tickets(&tickets) {
2168 Ok(c) => c,
2169 Err(e) => {
2170 yield OrchaEvent::Failed {
2171 session_id: "tickets".to_string(),
2172 error: format!("Ticket compile error: {}", e),
2173 };
2174 return;
2175 }
2176 };
2177 let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2178 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2179 let mut enriched_metadata = if metadata.is_object() { metadata.clone() } else { serde_json::json!({}) };
2180 enriched_metadata["_plexus_run_config"] = serde_json::json!({
2181 "model": model_str,
2182 "working_directory": wd,
2183 });
2184 let (graph_id, id_map) = match build_graph_from_definition(
2185 graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2186 ).await {
2187 Ok(pair) => pair,
2188 Err(e) => {
2189 yield OrchaEvent::Failed { session_id: "tickets".to_string(), error: e };
2190 return;
2191 }
2192 };
2193 let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2194 let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2195
2196 yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2197
2198 let model_enum = match model_str.as_str() {
2199 "opus" => Model::Opus,
2200 "haiku" => Model::Haiku,
2201 _ => Model::Sonnet,
2202 };
2203 if !std::path::Path::new(&wd).is_dir() {
2204 yield OrchaEvent::Failed {
2205 session_id: "tickets".to_string(),
2206 error: format!("Working directory does not exist: '{}'", wd),
2207 };
2208 return;
2209 }
2210 let node_to_ticket: std::collections::HashMap<String, String> = id_map
2211 .iter().map(|(t, n)| (n.clone(), t.clone())).collect();
2212 let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2213 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2214 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2215 let execution = graph_runner::run_graph_execution(
2216 graph, claudecode, arbor_storage, loopback_storage, pm,
2217 graph_runtime, cancel_registry.clone(),
2218 model_enum, wd, cancel_rx, node_to_ticket,
2219 );
2220 tokio::pin!(execution);
2221 while let Some(event) = execution.next().await {
2222 yield event;
2223 }
2224 cancel_registry.lock().await.remove(&graph_id);
2225 }
2226 }
2227
2228 #[plexus_macros::method(params(
2232 paths = "Absolute paths to ticket markdown files",
2233 metadata = "Arbitrary JSON metadata attached to the graph",
2234 model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2235 working_directory = "Working directory for task nodes (default: /workspace)"
2236 ))]
2237 async fn run_tickets_async_files(
2238 &self,
2239 paths: Vec<String>,
2240 metadata: Value,
2241 model: Option<String>,
2242 working_directory: Option<String>,
2243 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2244 let graph_runtime = self.graph_runtime.clone();
2245 let claudecode = self.claudecode.clone();
2246 let arbor_storage = self.arbor_storage.clone();
2247 let loopback_storage = self.loopback.storage();
2248 let pm = self.pm.clone();
2249 let cancel_registry = self.cancel_registry.clone();
2250 stream! {
2251 let mut parts: Vec<String> = Vec::new();
2252 for path in &paths {
2253 match tokio::fs::read_to_string(path).await {
2254 Ok(content) => parts.push(content),
2255 Err(e) => {
2256 yield OrchaEvent::Failed {
2257 session_id: "tickets".to_string(),
2258 error: format!("Failed to read '{}': {}", path, e),
2259 };
2260 return;
2261 }
2262 }
2263 }
2264 let tickets = parts.join("\n\n");
2265
2266 let compiled = match ticket_compiler::compile_tickets(&tickets) {
2267 Ok(c) => c,
2268 Err(e) => {
2269 yield OrchaEvent::Failed {
2270 session_id: "tickets".to_string(),
2271 error: format!("Ticket compile error: {}", e),
2272 };
2273 return;
2274 }
2275 };
2276 let model_str = model.as_deref().unwrap_or("sonnet").to_string();
2277 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2278 let mut enriched_metadata = if metadata.is_object() { metadata.clone() } else { serde_json::json!({}) };
2279 enriched_metadata["_plexus_run_config"] = serde_json::json!({
2280 "model": model_str,
2281 "working_directory": wd,
2282 });
2283 let (graph_id, id_map) = match build_graph_from_definition(
2284 graph_runtime.clone(), enriched_metadata, compiled.nodes, compiled.edges,
2285 ).await {
2286 Ok(pair) => pair,
2287 Err(e) => {
2288 yield OrchaEvent::Failed { session_id: "tickets".to_string(), error: e };
2289 return;
2290 }
2291 };
2292 let _ = pm.save_ticket_map(&graph_id, &id_map).await;
2293 let _ = pm.save_ticket_source(&graph_id, &tickets).await;
2294
2295 yield OrchaEvent::GraphStarted { graph_id: graph_id.clone() };
2296
2297 let model_enum = match model_str.as_str() {
2298 "opus" => Model::Opus,
2299 "haiku" => Model::Haiku,
2300 _ => Model::Sonnet,
2301 };
2302 if !std::path::Path::new(&wd).is_dir() {
2303 yield OrchaEvent::Failed {
2304 session_id: "tickets".to_string(),
2305 error: format!("Working directory does not exist: '{}'", wd),
2306 };
2307 return;
2308 }
2309 let node_to_ticket: std::collections::HashMap<String, String> = id_map
2310 .iter().map(|(t, n)| (n.clone(), t.clone())).collect();
2311 let graph = Arc::new(graph_runtime.open_graph(graph_id.clone()));
2312 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2313 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2314 tokio::spawn(async move {
2315 let execution = graph_runner::run_graph_execution(
2316 graph, claudecode, arbor_storage, loopback_storage, pm,
2317 graph_runtime, cancel_registry.clone(),
2318 model_enum, wd, cancel_rx, node_to_ticket,
2319 );
2320 tokio::pin!(execution);
2321 while let Some(event) = execution.next().await {
2322 match &event {
2323 OrchaEvent::Failed { error, .. } => {
2324 tracing::error!("run_tickets_async_files graph {} failed: {}", graph_id, error);
2325 }
2326 OrchaEvent::Complete { .. } => {
2327 tracing::info!("run_tickets_async_files graph {} complete", graph_id);
2328 }
2329 _ => {}
2330 }
2331 }
2332 cancel_registry.lock().await.remove(&graph_id);
2333 });
2334 }
2335 }
2336
2337 #[plexus_macros::method(params(
2342 metadata = "Arbitrary JSON metadata attached to the graph",
2343 model = "Model for task nodes: opus, sonnet, haiku (default: sonnet)",
2344 working_directory = "Working directory for task nodes (default: /workspace)",
2345 nodes = "Array of OrchaNodeDef: [{\"id\":\"...\",\"spec\":{\"type\":\"task\",\"task\":\"...\"}}]",
2346 edges = "Array of OrchaEdgeDef: [{\"from\":\"id1\",\"to\":\"id2\"}]"
2347 ))]
2348 async fn run_graph_definition(
2349 &self,
2350 metadata: Value,
2351 model: Option<String>,
2352 working_directory: Option<String>,
2353 nodes: Vec<OrchaNodeDef>,
2354 edges: Vec<OrchaEdgeDef>,
2355 ) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2356 build_and_run_graph_definition(
2357 self.graph_runtime.clone(),
2358 self.claudecode.clone(),
2359 self.arbor_storage.clone(),
2360 self.loopback.storage(),
2361 self.cancel_registry.clone(),
2362 self.pm.clone(),
2363 metadata,
2364 model,
2365 working_directory,
2366 nodes,
2367 edges,
2368 )
2369 }
2370}
2371
2372async fn build_graph_from_definition(
2384 graph_runtime: Arc<GraphRuntime>,
2385 metadata: Value,
2386 nodes: Vec<OrchaNodeDef>,
2387 edges: Vec<OrchaEdgeDef>,
2388) -> Result<(String, HashMap<String, String>), String> {
2389 let graph = graph_runtime
2390 .create_graph(metadata)
2391 .await
2392 .map_err(|e| format!("Failed to create graph: {}", e))?;
2393 let graph_id = graph.graph_id.clone();
2394
2395 let mut id_map: HashMap<String, String> = HashMap::new();
2396 for OrchaNodeDef { id, spec } in nodes {
2397 let result = match spec {
2398 OrchaNodeSpec::Task { task, max_retries } => graph.add_task(task, max_retries).await,
2399 OrchaNodeSpec::Synthesize { task, max_retries } => graph.add_synthesize(task, max_retries).await,
2400 OrchaNodeSpec::Validate { command, cwd, max_retries } => graph.add_validate(command, cwd, max_retries).await,
2401 OrchaNodeSpec::Gather { strategy } => graph.add_gather(strategy).await,
2402 OrchaNodeSpec::Review { prompt } => graph.add_review(prompt).await,
2403 OrchaNodeSpec::Plan { task } => graph.add_plan(task).await,
2404 };
2405 let lattice_id = match result {
2406 Ok(lid) => lid,
2407 Err(e) => return Err(format!("Failed to add node '{}': {}", id, e)),
2408 };
2409 id_map.insert(id, lattice_id);
2410 }
2411
2412 for OrchaEdgeDef { from, to } in edges {
2413 let dep_id = id_map
2414 .get(&from)
2415 .ok_or_else(|| format!("Unknown node id in edge.from: '{}'", from))?
2416 .clone();
2417 let node_id = id_map
2418 .get(&to)
2419 .ok_or_else(|| format!("Unknown node id in edge.to: '{}'", to))?
2420 .clone();
2421 graph
2422 .depends_on(&node_id, &dep_id)
2423 .await
2424 .map_err(|e| format!("Failed to add edge {} → {}: {}", from, to, e))?;
2425 }
2426
2427 Ok((graph_id, id_map))
2428}
2429
2430fn build_and_run_graph_definition<P: HubContext + 'static>(
2433 graph_runtime: Arc<GraphRuntime>,
2434 claudecode: Arc<ClaudeCode<P>>,
2435 arbor_storage: Arc<crate::activations::arbor::ArborStorage>,
2436 loopback_storage: Arc<crate::activations::claudecode_loopback::LoopbackStorage>,
2437 cancel_registry: CancelRegistry,
2438 pm: Arc<super::pm::Pm>,
2439 metadata: Value,
2440 model: Option<String>,
2441 working_directory: Option<String>,
2442 nodes: Vec<OrchaNodeDef>,
2443 edges: Vec<OrchaEdgeDef>,
2444) -> impl Stream<Item = OrchaEvent> + Send + 'static {
2445 stream! {
2446 let (graph_id, _) = match build_graph_from_definition(
2447 graph_runtime.clone(), metadata, nodes, edges,
2448 ).await {
2449 Ok(pair) => pair,
2450 Err(e) => {
2451 yield OrchaEvent::Failed {
2452 session_id: "graph_definition".to_string(),
2453 error: e,
2454 };
2455 return;
2456 }
2457 };
2458
2459 yield OrchaEvent::Progress {
2460 message: format!("Graph {} ready, starting execution", graph_id),
2461 percentage: None,
2462 };
2463
2464 let model_enum = match model.as_deref().unwrap_or("sonnet") {
2465 "opus" => Model::Opus,
2466 "haiku" => Model::Haiku,
2467 _ => Model::Sonnet,
2468 };
2469 let wd = working_directory.unwrap_or_else(|| "/workspace".to_string());
2470
2471 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2473 cancel_registry.lock().await.insert(graph_id.clone(), cancel_tx);
2474
2475 let execution = graph_runner::run_graph_execution(
2476 Arc::new(graph_runtime.open_graph(graph_id.clone())),
2477 claudecode,
2478 arbor_storage,
2479 loopback_storage,
2480 pm,
2481 graph_runtime,
2482 cancel_registry.clone(),
2483 model_enum,
2484 wd,
2485 cancel_rx,
2486 std::collections::HashMap::new(),
2487 );
2488 tokio::pin!(execution);
2489 while let Some(event) = execution.next().await {
2490 yield event;
2491 }
2492 cancel_registry.lock().await.remove(&graph_id);
2493 }
2494}
2495
2496fn extract_validation_artifact(text: &str) -> Option<ValidationArtifact> {
2502 use regex::Regex;
2504
2505 let re = match Regex::new(r#"\{"orcha_validate"\s*:\s*(\{[^}]+\})\}"#) {
2506 Ok(re) => re,
2507 Err(e) => {
2508 tracing::warn!("Failed to compile orcha_validate regex: {}", e);
2509 return None;
2510 }
2511 };
2512 let captures = re.captures(text)?;
2513 let json_str = captures.get(1)?.as_str();
2514
2515 match serde_json::from_str::<ValidationArtifact>(json_str) {
2516 Ok(artifact) => Some(artifact),
2517 Err(e) => {
2518 tracing::warn!("Failed to parse validation artifact JSON '{}': {}", json_str, e);
2519 None
2520 }
2521 }
2522}
2523
2524async fn run_validation_test(artifact: &ValidationArtifact) -> ValidationResult {
2526 let output = Command::new("sh")
2527 .arg("-c")
2528 .arg(&artifact.test_command)
2529 .current_dir(&artifact.cwd)
2530 .output()
2531 .await;
2532
2533 match output {
2534 Ok(output) => ValidationResult {
2535 success: output.status.success(),
2536 output: String::from_utf8_lossy(&output.stdout).to_string()
2537 + &String::from_utf8_lossy(&output.stderr).to_string(),
2538 exit_code: output.status.code(),
2539 },
2540 Err(e) => ValidationResult {
2541 success: false,
2542 output: format!("Failed to execute command: {}", e),
2543 exit_code: None,
2544 },
2545 }
2546}
2547
2548fn format_conversation_from_tree(tree: &crate::activations::arbor::Tree) -> String {
2552 use crate::activations::arbor::NodeType;
2553
2554 let mut output = String::new();
2555 let mut current_role = String::new();
2556 let mut message_text = String::new();
2557 let mut tool_uses = Vec::new();
2558
2559 fn walk_nodes(
2561 tree: &crate::activations::arbor::Tree,
2562 node_id: &crate::activations::arbor::NodeId,
2563 output: &mut String,
2564 current_role: &mut String,
2565 message_text: &mut String,
2566 tool_uses: &mut Vec<String>,
2567 ) {
2568 if let Some(node) = tree.nodes.get(node_id) {
2569 if let NodeType::Text { content } = &node.data {
2570 if let Ok(event) = serde_json::from_str::<serde_json::Value>(content) {
2572 if let Some(event_type) = event.get("type").and_then(|v| v.as_str()) {
2573 match event_type {
2574 "user_message" => {
2575 flush_message(output, current_role, message_text, tool_uses);
2577
2578 *current_role = "User".to_string();
2579 if let Some(content) = event.get("content").and_then(|v| v.as_str()) {
2580 *message_text = content.to_string();
2581 }
2582 }
2583 "assistant_start" => {
2584 flush_message(output, current_role, message_text, tool_uses);
2586
2587 *current_role = "Assistant".to_string();
2588 *message_text = String::new();
2589 }
2590 "content_text" => {
2591 if let Some(text) = event.get("text").and_then(|v| v.as_str()) {
2592 message_text.push_str(text);
2593 }
2594 }
2595 "content_tool_use" => {
2596 if let Some(name) = event.get("name").and_then(|v| v.as_str()) {
2597 let mut tool_str = format!("[Tool: {}]", name);
2598 if let Some(input) = event.get("input") {
2599 if let Ok(input_str) = serde_json::to_string_pretty(input) {
2600 let trimmed = if input_str.len() > 200 {
2602 format!("{}...", &input_str[..200])
2603 } else {
2604 input_str
2605 };
2606 tool_str.push_str(&format!(" {}", trimmed));
2607 }
2608 }
2609 tool_uses.push(tool_str);
2610 }
2611 }
2612 _ => {} }
2614 }
2615 }
2616 }
2617
2618 for child_id in &node.children {
2620 walk_nodes(tree, child_id, output, current_role, message_text, tool_uses);
2621 }
2622 }
2623 }
2624
2625 fn flush_message(
2626 output: &mut String,
2627 current_role: &str,
2628 message_text: &str,
2629 tool_uses: &mut Vec<String>,
2630 ) {
2631 if !current_role.is_empty() && (!message_text.is_empty() || !tool_uses.is_empty()) {
2632 output.push_str(&format!("{}:\n", current_role));
2633 if !message_text.is_empty() {
2634 output.push_str(message_text);
2635 output.push_str("\n");
2636 }
2637 for tool in tool_uses.drain(..) {
2638 output.push_str(&format!(" {}\n", tool));
2639 }
2640 output.push_str("\n");
2641 }
2642 }
2643
2644 walk_nodes(tree, &tree.root, &mut output, &mut current_role, &mut message_text, &mut tool_uses);
2646
2647 flush_message(&mut output, ¤t_role, &message_text, &mut tool_uses);
2649
2650 output
2651}
2652
2653async fn save_status_summary_to_arbor(
2658 arbor_storage: &crate::activations::arbor::ArborStorage,
2659 session_id: &str,
2660 summary: &str,
2661) -> Result<(), String> {
2662 use crate::activations::arbor::TreeId;
2663
2664 let tree_path = format!("orcha.{}.monitor", session_id);
2666 let tree_uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, tree_path.as_bytes());
2667 let monitor_tree_id = TreeId::from(tree_uuid);
2668
2669 let tree = match arbor_storage.tree_get(&monitor_tree_id).await {
2671 Ok(tree) => tree,
2672 Err(_) => {
2673 let metadata = serde_json::json!({
2675 "type": "orcha_monitor",
2676 "session_id": session_id,
2677 "tree_path": tree_path
2678 });
2679
2680 let created_tree_id = arbor_storage.tree_create_with_id(
2681 Some(monitor_tree_id),
2682 Some(metadata),
2683 "orcha",
2684 ).await.map_err(|e| e.to_string())?;
2685
2686 arbor_storage.tree_get(&created_tree_id).await
2687 .map_err(|e| e.to_string())?
2688 }
2689 };
2690
2691 let parent = tree.nodes.values()
2693 .filter(|n| matches!(n.data, crate::activations::arbor::NodeType::Text { .. }))
2694 .max_by_key(|n| n.created_at)
2695 .map(|n| n.id)
2696 .unwrap_or(tree.root);
2697
2698 let timestamp = chrono::Utc::now().to_rfc3339();
2700 let summary_content = format!(
2701 "[{}] {}\n",
2702 timestamp,
2703 summary.trim()
2704 );
2705
2706 arbor_storage.node_create_text(
2707 &tree.id,
2708 Some(parent),
2709 summary_content,
2710 None,
2711 ).await.map_err(|e| e.to_string())?;
2712
2713 Ok(())
2714}
2715
2716async fn generate_agent_summary<P: HubContext>(
2718 claudecode: &ClaudeCode<P>,
2719 arbor_storage: &crate::activations::arbor::ArborStorage,
2720 agent: AgentInfo,
2721) -> Result<AgentSummary, String> {
2722 use futures::StreamExt;
2723
2724 let cc_session = claudecode.storage.session_get_by_name(&agent.claudecode_session_id).await
2726 .map_err(|e| format!("Failed to get CC session: {}", e))?;
2727
2728 let tree = arbor_storage.tree_get(&cc_session.head.tree_id).await
2729 .map_err(|e| format!("Failed to get tree: {}", e))?;
2730
2731 let conversation = format_conversation_from_tree(&tree);
2732
2733 let summary_session = format!("orcha-agent-summary-{}", Uuid::new_v4());
2735 let summary_session_id = format!("{}-agent-summary-{}", agent.session_id, Uuid::new_v4());
2736
2737 let create_stream = claudecode.create(
2738 summary_session.clone(),
2739 "/workspace".to_string(),
2740 crate::activations::claudecode::Model::Haiku,
2741 None,
2742 Some(false),
2743 Some(summary_session_id), ).await;
2745 tokio::pin!(create_stream);
2746
2747 let mut created = false;
2749 while let Some(result) = create_stream.next().await {
2750 if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
2751 created = true;
2752 break;
2753 }
2754 }
2755
2756 if !created {
2757 return Err("Failed to create summary session".to_string());
2758 }
2759
2760 let prompt = format!(
2762 "Summarize this agent's work in 2-3 sentences:\n\n\
2763 Subtask: {}\n\
2764 State: {:?}\n\n\
2765 Conversation:\n{}\n\n\
2766 Be concise and focus on what was accomplished or is in progress.",
2767 agent.subtask,
2768 agent.state,
2769 conversation
2770 );
2771
2772 let chat_stream = claudecode.chat(summary_session, prompt, Some(true), None).await;
2773 tokio::pin!(chat_stream);
2774
2775 let mut summary = String::new();
2776 while let Some(event) = chat_stream.next().await {
2777 if let crate::activations::claudecode::ChatEvent::Content { text } = event {
2778 summary.push_str(&text);
2779 }
2780 }
2781
2782 Ok(AgentSummary {
2783 agent_id: agent.agent_id,
2784 subtask: agent.subtask,
2785 state: agent.state,
2786 summary,
2787 })
2788}
2789
2790async fn generate_overall_summary<P: HubContext>(
2792 claudecode: &ClaudeCode<P>,
2793 session_id: &SessionId,
2794 agent_summaries: &[AgentSummary],
2795) -> Option<String> {
2796 use futures::StreamExt;
2797
2798 let summary_session = format!("orcha-meta-summary-{}", Uuid::new_v4());
2799 let meta_summary_session_id = format!("{}-meta-summary-{}", session_id, Uuid::new_v4());
2800
2801 let create_stream = claudecode.create(
2803 summary_session.clone(),
2804 "/workspace".to_string(),
2805 crate::activations::claudecode::Model::Haiku,
2806 None,
2807 Some(false),
2808 Some(meta_summary_session_id), ).await;
2810 tokio::pin!(create_stream);
2811
2812 let mut created = false;
2813 while let Some(result) = create_stream.next().await {
2814 if let crate::activations::claudecode::CreateResult::Ok { .. } = result {
2815 created = true;
2816 break;
2817 }
2818 }
2819
2820 if !created {
2821 return None;
2822 }
2823
2824 let mut agent_list = String::new();
2826 for (i, summary) in agent_summaries.iter().enumerate() {
2827 agent_list.push_str(&format!(
2828 "{}. {} ({:?})\n {}\n\n",
2829 i + 1,
2830 summary.subtask,
2831 summary.state,
2832 summary.summary
2833 ));
2834 }
2835
2836 let prompt = format!(
2837 "This is a multi-agent orchestration session with {} agents working on different subtasks.\n\n\
2838 Agent summaries:\n{}\n\
2839 Provide a 2-4 sentence overall summary of the session's progress and coordination.\n\
2840 Focus on: what's the big picture? What's been accomplished? What's still in progress?",
2841 agent_summaries.len(),
2842 agent_list
2843 );
2844
2845 let chat_stream = claudecode.chat(summary_session, prompt, Some(true), None).await;
2846 tokio::pin!(chat_stream);
2847
2848 let mut summary = String::new();
2849 while let Some(event) = chat_stream.next().await {
2850 if let crate::activations::claudecode::ChatEvent::Content { text } = event {
2851 summary.push_str(&text);
2852 }
2853 }
2854
2855 Some(summary)
2856}