Skip to main content

plexus_substrate/activations/orcha/pm/
activation.rs

1use crate::activations::lattice::{LatticeStorage, NodeSpec, NodeStatus};
2use crate::activations::orcha::OrchaNodeKind;
3use crate::plexus::{Activation, ChildRouter, PlexusError, PlexusStream};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::Stream;
7use plexus_macros::activation;
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use super::storage::PmStorage;
15
16// ─── Result types ─────────────────────────────────────────────────────────────
17
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct PmTicketStatus {
20    pub ticket_id: String,
21    pub node_id: String,
22    pub status: String,
23    pub kind: String,
24    pub label: Option<String>,
25    pub child_graph_id: Option<String>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
29#[serde(tag = "type", rename_all = "snake_case")]
30pub enum PmGraphStatusResult {
31    Ok {
32        graph_id: String,
33        graph_status: String,
34        tickets: Vec<PmTicketStatus>,
35    },
36    Err {
37        message: String,
38    },
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
42#[serde(tag = "type", rename_all = "snake_case")]
43pub enum PmWhatNextResult {
44    Ok {
45        graph_id: String,
46        tickets: Vec<PmTicketStatus>,
47    },
48    Err {
49        message: String,
50    },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
54#[serde(tag = "type", rename_all = "snake_case")]
55pub enum PmInspectResult {
56    Ok {
57        ticket_id: String,
58        node_id: String,
59        status: String,
60        kind: String,
61        task: Option<String>,
62        command: Option<String>,
63        output: Option<Value>,
64        error: Option<String>,
65        child_graph_id: Option<String>,
66    },
67    NotFound {
68        ticket_id: String,
69    },
70    Err {
71        message: String,
72    },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum PmWhyBlockedResult {
78    Ok {
79        ticket_id: String,
80        blocked_by: Vec<PmTicketStatus>,
81    },
82    NotBlocked {
83        ticket_id: String,
84    },
85    Err {
86        message: String,
87    },
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
91pub struct PmGraphSummary {
92    pub graph_id: String,
93    pub status: String,
94    pub metadata: Value,
95    pub ticket_count: usize,
96    pub created_at: i64,
97    /// Original task description passed to run_plan / run_tickets (first 200 chars).
98    pub source: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
102#[serde(tag = "type", rename_all = "snake_case")]
103pub enum PmListGraphsResult {
104    Ok {
105        graphs: Vec<PmGraphSummary>,
106    },
107    Err {
108        message: String,
109    },
110}
111
112// ─── Pm activation ────────────────────────────────────────────────────────────
113
114#[derive(Clone)]
115pub struct Pm {
116    pm_storage: Arc<PmStorage>,
117    lattice_storage: Arc<LatticeStorage>,
118}
119
120impl Pm {
121    pub fn new(pm_storage: Arc<PmStorage>, lattice_storage: Arc<LatticeStorage>) -> Self {
122        Self { pm_storage, lattice_storage }
123    }
124
125    /// Save ticket→node mappings for a graph (called by Orcha after build).
126    pub async fn save_ticket_map(
127        &self,
128        graph_id: &str,
129        map: &HashMap<String, String>,
130    ) -> Result<(), String> {
131        self.pm_storage.save_ticket_map(graph_id, map).await
132    }
133
134    /// Fetch the ticket_id→node_id map for a graph.
135    pub async fn get_ticket_map(&self, graph_id: &str) -> Result<HashMap<String, String>, String> {
136        self.pm_storage.get_ticket_map(graph_id).await
137    }
138
139    /// Return all graph IDs known to PM (regardless of status), most-recent first.
140    ///
141    /// Used by the startup recovery pass to find graphs that should be re-watched.
142    pub async fn list_all_graph_ids(&self) -> Result<Vec<String>, String> {
143        let entries = self.pm_storage.list_ticket_maps(usize::MAX).await?;
144        Ok(entries.into_iter().map(|(id, _)| id).collect())
145    }
146
147    /// Save the raw ticket source for a graph (called by run_tickets / run_tickets_async).
148    pub async fn save_ticket_source(&self, graph_id: &str, source: &str) -> Result<(), String> {
149        self.pm_storage.save_ticket_source(graph_id, source).await
150    }
151
152    /// Fetch the raw ticket source for a graph.
153    pub async fn get_ticket_source_raw(&self, graph_id: &str) -> Result<Option<String>, String> {
154        self.pm_storage.get_ticket_source(graph_id).await
155    }
156
157    /// Append a single event to the node execution log.
158    ///
159    /// Called from `dispatch_task` for each ChatEvent and the final outcome.
160    pub async fn log_node_event(
161        &self,
162        graph_id: &str,
163        node_id: &str,
164        ticket_id: Option<&str>,
165        seq: i64,
166        event_type: &str,
167        event_data: serde_json::Value,
168    ) {
169        let data_str = serde_json::to_string(&event_data).unwrap_or_default();
170        if let Err(e) = self.pm_storage
171            .append_node_log(graph_id, node_id, ticket_id, seq, event_type, &data_str)
172            .await
173        {
174            tracing::warn!("log_node_event failed for {}/{}: {}", graph_id, node_id, e);
175        }
176    }
177}
178
179// ─── Helpers ─────────────────────────────────────────────────────────────────
180
181fn node_status_str(status: &NodeStatus) -> &'static str {
182    match status {
183        NodeStatus::Pending => "pending",
184        NodeStatus::Ready => "ready",
185        NodeStatus::Running => "running",
186        NodeStatus::Complete => "complete",
187        NodeStatus::Failed => "failed",
188    }
189}
190
191fn extract_kind_and_label(spec: &NodeSpec) -> (String, Option<String>) {
192    match spec {
193        NodeSpec::Task { data, .. } => {
194            match serde_json::from_value::<OrchaNodeKind>(data.clone()) {
195                Ok(OrchaNodeKind::Task { task, .. }) => {
196                    let label = task.chars().take(80).collect::<String>();
197                    ("task".to_string(), Some(label))
198                }
199                Ok(OrchaNodeKind::Synthesize { task, .. }) => {
200                    let label = task.chars().take(80).collect::<String>();
201                    ("synthesize".to_string(), Some(label))
202                }
203                Ok(OrchaNodeKind::Validate { command, .. }) => {
204                    let label = command.chars().take(80).collect::<String>();
205                    ("validate".to_string(), Some(label))
206                }
207                Ok(OrchaNodeKind::Review { prompt }) => {
208                    let label = prompt.chars().take(80).collect::<String>();
209                    ("review".to_string(), Some(label))
210                }
211                Ok(OrchaNodeKind::Plan { task }) => {
212                    let label = task.chars().take(80).collect::<String>();
213                    ("plan".to_string(), Some(label))
214                }
215                Err(_) => ("task".to_string(), None),
216            }
217        }
218        NodeSpec::Gather { .. } => ("gather".to_string(), None),
219        NodeSpec::Scatter { .. } => ("scatter".to_string(), None),
220        NodeSpec::SubGraph { .. } => ("subgraph".to_string(), None),
221    }
222}
223
224// ─── Hub methods ─────────────────────────────────────────────────────────────
225
226#[plexus_macros::activation(namespace = "pm",
227version = "1.0.0",
228description = "Project management view of orcha graph execution in ticket vocabulary", crate_path = "plexus_core")]
229impl Pm {
230    /// Get the status of all tickets in a graph.
231    #[plexus_macros::method(params(
232        graph_id   = "The lattice graph ID returned by build_tickets or run_tickets",
233        recursive  = "Optional: when true, include child_graph_id from completed node outputs (default false)"
234    ))]
235    async fn graph_status(
236        &self,
237        graph_id: String,
238        recursive: Option<bool>,
239    ) -> impl Stream<Item = PmGraphStatusResult> + Send + 'static {
240        let pm_storage = self.pm_storage.clone();
241        let lattice_storage = self.lattice_storage.clone();
242
243        stream! {
244            let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
245                Ok(m) => m,
246                Err(e) => { yield PmGraphStatusResult::Err { message: e }; return; }
247            };
248
249            let mut tickets = Vec::new();
250            let mut has_pending = false;
251            let mut has_ready = false;
252            let mut has_running = false;
253            let mut has_failed = false;
254            let mut all_complete = true;
255
256            for (ticket_id, node_id) in &ticket_map {
257                match lattice_storage.get_node(node_id).await {
258                    Ok(node) => {
259                        match node.status {
260                            NodeStatus::Pending  => { has_pending  = true; all_complete = false; }
261                            NodeStatus::Ready    => { has_ready    = true; all_complete = false; }
262                            NodeStatus::Running  => { has_running  = true; all_complete = false; }
263                            NodeStatus::Failed   => { has_failed   = true; all_complete = false; }
264                            NodeStatus::Complete => {}
265                        }
266                        let (kind, label) = extract_kind_and_label(&node.spec);
267                        let child_graph_id = if recursive.unwrap_or(false) && node.status == NodeStatus::Complete {
268                            node.output.as_ref().and_then(|o| {
269                                if let crate::activations::lattice::NodeOutput::Single(token) = o {
270                                    if let Some(crate::activations::lattice::TokenPayload::Data { value }) = &token.payload {
271                                        value.get("child_graph_id").and_then(|v| v.as_str()).map(|s| s.to_string())
272                                    } else { None }
273                                } else { None }
274                            })
275                        } else {
276                            None
277                        };
278                        tickets.push(PmTicketStatus {
279                            ticket_id: ticket_id.clone(),
280                            node_id: node_id.clone(),
281                            status: node_status_str(&node.status).to_string(),
282                            kind,
283                            label,
284                            child_graph_id,
285                        });
286                    }
287                    Err(e) => {
288                        yield PmGraphStatusResult::Err {
289                            message: format!("Failed to get node {}: {}", node_id, e),
290                        };
291                        return;
292                    }
293                }
294            }
295
296            let graph_status = if has_failed {
297                "failed"
298            } else if has_running || has_ready {
299                "running"
300            } else if has_pending {
301                "pending"
302            } else if all_complete && !ticket_map.is_empty() {
303                "complete"
304            } else {
305                "pending"
306            };
307
308            yield PmGraphStatusResult::Ok {
309                graph_id,
310                graph_status: graph_status.to_string(),
311                tickets,
312            };
313        }
314    }
315
316    /// Get tickets that are ready or running (next actionable items).
317    #[plexus_macros::method(params(
318        graph_id = "The lattice graph ID returned by build_tickets or run_tickets"
319    ))]
320    async fn what_next(
321        &self,
322        graph_id: String,
323    ) -> impl Stream<Item = PmWhatNextResult> + Send + 'static {
324        let pm_storage = self.pm_storage.clone();
325        let lattice_storage = self.lattice_storage.clone();
326
327        stream! {
328            let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
329                Ok(m) => m,
330                Err(e) => { yield PmWhatNextResult::Err { message: e }; return; }
331            };
332
333            let mut tickets = Vec::new();
334            for (ticket_id, node_id) in &ticket_map {
335                match lattice_storage.get_node(node_id).await {
336                    Ok(node) => {
337                        if matches!(node.status, NodeStatus::Ready | NodeStatus::Running) {
338                            let (kind, label) = extract_kind_and_label(&node.spec);
339                            tickets.push(PmTicketStatus {
340                                ticket_id: ticket_id.clone(),
341                                node_id: node_id.clone(),
342                                status: node_status_str(&node.status).to_string(),
343                                kind,
344                                label,
345                                child_graph_id: None,
346                            });
347                        }
348                    }
349                    Err(e) => {
350                        yield PmWhatNextResult::Err {
351                            message: format!("Failed to get node {}: {}", node_id, e),
352                        };
353                        return;
354                    }
355                }
356            }
357
358            yield PmWhatNextResult::Ok { graph_id, tickets };
359        }
360    }
361
362    /// Inspect a single ticket in detail.
363    #[plexus_macros::method(params(
364        graph_id = "The lattice graph ID returned by build_tickets or run_tickets",
365        ticket_id = "The ticket ID (as used in the ticket file)"
366    ))]
367    async fn inspect_ticket(
368        &self,
369        graph_id: String,
370        ticket_id: String,
371    ) -> impl Stream<Item = PmInspectResult> + Send + 'static {
372        let pm_storage = self.pm_storage.clone();
373        let lattice_storage = self.lattice_storage.clone();
374
375        stream! {
376            let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
377                Ok(m) => m,
378                Err(e) => { yield PmInspectResult::Err { message: e }; return; }
379            };
380
381            let node_id = match ticket_map.get(&ticket_id) {
382                Some(id) => id.clone(),
383                None => { yield PmInspectResult::NotFound { ticket_id }; return; }
384            };
385
386            let node = match lattice_storage.get_node(&node_id).await {
387                Ok(n) => n,
388                Err(e) => {
389                    yield PmInspectResult::Err {
390                        message: format!("Failed to get node: {}", e),
391                    };
392                    return;
393                }
394            };
395
396            let status = node_status_str(&node.status).to_string();
397            let output = node.output.as_ref()
398                .map(|o| serde_json::to_value(o).unwrap_or(Value::Null));
399            let error = node.error.clone();
400
401            let child_graph_id = output.as_ref()
402                .and_then(|o| o.get("payload"))
403                .and_then(|p| p.get("value"))
404                .and_then(|v| v.get("child_graph_id"))
405                .and_then(|id| id.as_str())
406                .map(|s| s.to_string());
407
408            match &node.spec {
409                NodeSpec::Task { data, .. } => {
410                    match serde_json::from_value::<OrchaNodeKind>(data.clone()) {
411                        Ok(OrchaNodeKind::Task { task, .. }) => {
412                            yield PmInspectResult::Ok {
413                                ticket_id, node_id, status,
414                                kind: "task".to_string(),
415                                task: Some(task), command: None, output, error,
416                                child_graph_id,
417                            };
418                        }
419                        Ok(OrchaNodeKind::Synthesize { task, .. }) => {
420                            yield PmInspectResult::Ok {
421                                ticket_id, node_id, status,
422                                kind: "synthesize".to_string(),
423                                task: Some(task), command: None, output, error,
424                                child_graph_id,
425                            };
426                        }
427                        Ok(OrchaNodeKind::Validate { command, .. }) => {
428                            yield PmInspectResult::Ok {
429                                ticket_id, node_id, status,
430                                kind: "validate".to_string(),
431                                task: None, command: Some(command), output, error,
432                                child_graph_id,
433                            };
434                        }
435                        Ok(OrchaNodeKind::Review { prompt }) => {
436                            yield PmInspectResult::Ok {
437                                ticket_id, node_id, status,
438                                kind: "review".to_string(),
439                                task: Some(prompt), command: None, output, error,
440                                child_graph_id,
441                            };
442                        }
443                        Ok(OrchaNodeKind::Plan { task }) => {
444                            yield PmInspectResult::Ok {
445                                ticket_id, node_id, status,
446                                kind: "plan".to_string(),
447                                task: Some(task), command: None, output, error,
448                                child_graph_id,
449                            };
450                        }
451                        Err(_) => {
452                            yield PmInspectResult::Ok {
453                                ticket_id, node_id, status,
454                                kind: "task".to_string(),
455                                task: None, command: None, output, error,
456                                child_graph_id,
457                            };
458                        }
459                    }
460                }
461                NodeSpec::Gather { .. } => {
462                    yield PmInspectResult::Ok {
463                        ticket_id, node_id, status,
464                        kind: "gather".to_string(),
465                        task: None, command: None, output, error,
466                        child_graph_id,
467                    };
468                }
469                _ => {
470                    yield PmInspectResult::Ok {
471                        ticket_id, node_id, status,
472                        kind: "other".to_string(),
473                        task: None, command: None, output, error,
474                        child_graph_id,
475                    };
476                }
477            }
478        }
479    }
480
481    /// Explain why a ticket is blocked.
482    #[plexus_macros::method(params(
483        graph_id = "The lattice graph ID returned by build_tickets or run_tickets",
484        ticket_id = "The ticket ID to investigate"
485    ))]
486    async fn why_blocked(
487        &self,
488        graph_id: String,
489        ticket_id: String,
490    ) -> impl Stream<Item = PmWhyBlockedResult> + Send + 'static {
491        let pm_storage = self.pm_storage.clone();
492        let lattice_storage = self.lattice_storage.clone();
493
494        stream! {
495            let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
496                Ok(m) => m,
497                Err(e) => { yield PmWhyBlockedResult::Err { message: e }; return; }
498            };
499
500            let node_id = match ticket_map.get(&ticket_id) {
501                Some(id) => id.clone(),
502                None => {
503                    yield PmWhyBlockedResult::Err {
504                        message: format!("Ticket not found: {}", ticket_id),
505                    };
506                    return;
507                }
508            };
509
510            let predecessors = match lattice_storage.get_inbound_edges(&node_id).await {
511                Ok(p) => p,
512                Err(e) => {
513                    yield PmWhyBlockedResult::Err {
514                        message: format!("Failed to get predecessors: {}", e),
515                    };
516                    return;
517                }
518            };
519
520            let mut blocked_by = Vec::new();
521            for pred_id in predecessors {
522                let pred_node = match lattice_storage.get_node(&pred_id).await {
523                    Ok(n) => n,
524                    Err(_) => continue,
525                };
526
527                if pred_node.status == NodeStatus::Complete {
528                    continue;
529                }
530
531                let pred_ticket_id = pm_storage
532                    .get_ticket_for_node(&graph_id, &pred_id)
533                    .await
534                    .unwrap_or(None)
535                    .unwrap_or_else(|| pred_id.clone());
536
537                let (kind, label) = extract_kind_and_label(&pred_node.spec);
538                blocked_by.push(PmTicketStatus {
539                    ticket_id: pred_ticket_id,
540                    node_id: pred_id,
541                    status: node_status_str(&pred_node.status).to_string(),
542                    kind,
543                    label,
544                    child_graph_id: None,
545                });
546            }
547
548            if blocked_by.is_empty() {
549                yield PmWhyBlockedResult::NotBlocked { ticket_id };
550            } else {
551                yield PmWhyBlockedResult::Ok { ticket_id, blocked_by };
552            }
553        }
554    }
555
556    /// Get the raw ticket source for a graph.
557    #[plexus_macros::method(params(
558        graph_id = "The lattice graph ID"
559    ))]
560    async fn get_ticket_source(
561        &self,
562        graph_id: String,
563    ) -> impl Stream<Item = Value> + Send + 'static {
564        let pm_storage = self.pm_storage.clone();
565        stream! {
566            match pm_storage.get_ticket_source(&graph_id).await {
567                Ok(Some(source)) => yield serde_json::json!({ "type": "ok", "source": source }),
568                Ok(None) => yield serde_json::json!({ "type": "not_found", "graph_id": graph_id }),
569                Err(e) => yield serde_json::json!({ "type": "err", "message": e }),
570            }
571        }
572    }
573
574    /// List graphs tracked by the pm layer, optionally filtered by project metadata.
575    #[plexus_macros::method(params(
576        project   = "Optional: filter by metadata.project string",
577        limit     = "Optional: max results (default 20)",
578        root_only = "Optional: when true (default), only return root graphs (no parent); set false to include subgraphs",
579        status    = "Optional: filter by graph status (running, complete, failed)"
580    ))]
581    async fn list_graphs(
582        &self,
583        project: Option<String>,
584        limit: Option<usize>,
585        root_only: Option<bool>,
586        status: Option<String>,
587    ) -> impl Stream<Item = PmListGraphsResult> + Send + 'static {
588        let pm_storage = self.pm_storage.clone();
589        let lattice_storage = self.lattice_storage.clone();
590
591        stream! {
592            let limit = limit.unwrap_or(20);
593
594            let entries = match pm_storage.list_ticket_maps(limit).await {
595                Ok(v) => v,
596                Err(e) => {
597                    yield PmListGraphsResult::Err { message: e };
598                    return;
599                }
600            };
601
602            let mut graphs = Vec::new();
603
604            for (graph_id, created_at) in entries {
605                let lattice_graph = match lattice_storage.get_graph(&graph_id).await {
606                    Ok(g) => g,
607                    Err(_) => continue,
608                };
609
610                // Apply root_only filter (default true — skip child graphs).
611                if root_only.unwrap_or(true) && lattice_graph.parent_graph_id.is_some() {
612                    continue;
613                }
614
615                // Apply optional status filter.
616                if let Some(ref status_filter) = status {
617                    if lattice_graph.status.to_string() != *status_filter {
618                        continue;
619                    }
620                }
621
622                // Apply optional project filter.
623                if let Some(ref project_filter) = project {
624                    let graph_project = lattice_graph.metadata.get("project")
625                        .and_then(|v| v.as_str())
626                        .unwrap_or("");
627                    if graph_project != project_filter.as_str() {
628                        continue;
629                    }
630                }
631
632                let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
633                    Ok(m) => m,
634                    Err(_) => HashMap::new(),
635                };
636
637                let status = lattice_graph.status.to_string();
638
639                let source = pm_storage.get_ticket_source(&graph_id).await
640                    .ok()
641                    .flatten()
642                    .map(|s: String| {
643                        // Truncate to 200 chars for summary display
644                        let trimmed = s.trim().to_string();
645                        if trimmed.len() > 200 {
646                            format!("{}…", &trimmed[..197])
647                        } else {
648                            trimmed
649                        }
650                    });
651
652                graphs.push(PmGraphSummary {
653                    graph_id,
654                    status,
655                    metadata: lattice_graph.metadata,
656                    ticket_count: ticket_map.len(),
657                    created_at,
658                    source,
659                });
660            }
661
662            yield PmListGraphsResult::Ok { graphs };
663        }
664    }
665
666    /// Retrieve the full execution log for a node.
667    ///
668    /// Returns all events recorded by `dispatch_task` in sequence order:
669    /// "prompt" (task sent to Claude), "start" (session created), "tool_use",
670    /// "tool_result", "complete", "error", "passthrough", "outcome" (final result).
671    ///
672    /// Use this to diagnose why a node failed or produced unexpected output.
673    #[plexus_macros::method(params(
674        graph_id = "Graph ID (from GraphStarted event or pm.list_graphs)",
675        node_id  = "Lattice node ID (from NodeStarted event or pm.graph_status)"
676    ))]
677    async fn get_node_log(
678        &self,
679        graph_id: String,
680        node_id: String,
681    ) -> impl Stream<Item = Value> + Send + 'static {
682        let pm_storage = self.pm_storage.clone();
683        stream! {
684            match pm_storage.get_node_log(&graph_id, &node_id).await {
685                Ok(entries) => {
686                    for entry in entries {
687                        let data: Value = serde_json::from_str(&entry.event_data)
688                            .unwrap_or(serde_json::json!({ "raw": entry.event_data }));
689                        yield serde_json::json!({
690                            "seq": entry.seq,
691                            "event_type": entry.event_type,
692                            "data": data,
693                            "created_at": entry.created_at,
694                        });
695                    }
696                }
697                Err(e) => {
698                    yield serde_json::json!({ "type": "err", "message": e });
699                }
700            }
701        }
702    }
703}