1use crate::activations::lattice::{LatticeStorage, NodeSpec, NodeStatus};
2use crate::activations::orcha::OrchaNodeKind;
3use crate::plexus::{Activation, ChildRouter, PlexusError, PlexusStream};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::Stream;
7use plexus_macros::activation;
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use super::storage::PmStorage;
15
16#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct PmTicketStatus {
20 pub ticket_id: String,
21 pub node_id: String,
22 pub status: String,
23 pub kind: String,
24 pub label: Option<String>,
25 pub child_graph_id: Option<String>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
29#[serde(tag = "type", rename_all = "snake_case")]
30pub enum PmGraphStatusResult {
31 Ok {
32 graph_id: String,
33 graph_status: String,
34 tickets: Vec<PmTicketStatus>,
35 },
36 Err {
37 message: String,
38 },
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
42#[serde(tag = "type", rename_all = "snake_case")]
43pub enum PmWhatNextResult {
44 Ok {
45 graph_id: String,
46 tickets: Vec<PmTicketStatus>,
47 },
48 Err {
49 message: String,
50 },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
54#[serde(tag = "type", rename_all = "snake_case")]
55pub enum PmInspectResult {
56 Ok {
57 ticket_id: String,
58 node_id: String,
59 status: String,
60 kind: String,
61 task: Option<String>,
62 command: Option<String>,
63 output: Option<Value>,
64 error: Option<String>,
65 child_graph_id: Option<String>,
66 },
67 NotFound {
68 ticket_id: String,
69 },
70 Err {
71 message: String,
72 },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum PmWhyBlockedResult {
78 Ok {
79 ticket_id: String,
80 blocked_by: Vec<PmTicketStatus>,
81 },
82 NotBlocked {
83 ticket_id: String,
84 },
85 Err {
86 message: String,
87 },
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
91pub struct PmGraphSummary {
92 pub graph_id: String,
93 pub status: String,
94 pub metadata: Value,
95 pub ticket_count: usize,
96 pub created_at: i64,
97 pub source: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
102#[serde(tag = "type", rename_all = "snake_case")]
103pub enum PmListGraphsResult {
104 Ok {
105 graphs: Vec<PmGraphSummary>,
106 },
107 Err {
108 message: String,
109 },
110}
111
112#[derive(Clone)]
115pub struct Pm {
116 pm_storage: Arc<PmStorage>,
117 lattice_storage: Arc<LatticeStorage>,
118}
119
120impl Pm {
121 pub fn new(pm_storage: Arc<PmStorage>, lattice_storage: Arc<LatticeStorage>) -> Self {
122 Self { pm_storage, lattice_storage }
123 }
124
125 pub async fn save_ticket_map(
127 &self,
128 graph_id: &str,
129 map: &HashMap<String, String>,
130 ) -> Result<(), String> {
131 self.pm_storage.save_ticket_map(graph_id, map).await
132 }
133
134 pub async fn get_ticket_map(&self, graph_id: &str) -> Result<HashMap<String, String>, String> {
136 self.pm_storage.get_ticket_map(graph_id).await
137 }
138
139 pub async fn list_all_graph_ids(&self) -> Result<Vec<String>, String> {
143 let entries = self.pm_storage.list_ticket_maps(usize::MAX).await?;
144 Ok(entries.into_iter().map(|(id, _)| id).collect())
145 }
146
147 pub async fn save_ticket_source(&self, graph_id: &str, source: &str) -> Result<(), String> {
149 self.pm_storage.save_ticket_source(graph_id, source).await
150 }
151
152 pub async fn get_ticket_source_raw(&self, graph_id: &str) -> Result<Option<String>, String> {
154 self.pm_storage.get_ticket_source(graph_id).await
155 }
156
157 pub async fn log_node_event(
161 &self,
162 graph_id: &str,
163 node_id: &str,
164 ticket_id: Option<&str>,
165 seq: i64,
166 event_type: &str,
167 event_data: serde_json::Value,
168 ) {
169 let data_str = serde_json::to_string(&event_data).unwrap_or_default();
170 if let Err(e) = self.pm_storage
171 .append_node_log(graph_id, node_id, ticket_id, seq, event_type, &data_str)
172 .await
173 {
174 tracing::warn!("log_node_event failed for {}/{}: {}", graph_id, node_id, e);
175 }
176 }
177}
178
179fn node_status_str(status: &NodeStatus) -> &'static str {
182 match status {
183 NodeStatus::Pending => "pending",
184 NodeStatus::Ready => "ready",
185 NodeStatus::Running => "running",
186 NodeStatus::Complete => "complete",
187 NodeStatus::Failed => "failed",
188 }
189}
190
191fn extract_kind_and_label(spec: &NodeSpec) -> (String, Option<String>) {
192 match spec {
193 NodeSpec::Task { data, .. } => {
194 match serde_json::from_value::<OrchaNodeKind>(data.clone()) {
195 Ok(OrchaNodeKind::Task { task, .. }) => {
196 let label = task.chars().take(80).collect::<String>();
197 ("task".to_string(), Some(label))
198 }
199 Ok(OrchaNodeKind::Synthesize { task, .. }) => {
200 let label = task.chars().take(80).collect::<String>();
201 ("synthesize".to_string(), Some(label))
202 }
203 Ok(OrchaNodeKind::Validate { command, .. }) => {
204 let label = command.chars().take(80).collect::<String>();
205 ("validate".to_string(), Some(label))
206 }
207 Ok(OrchaNodeKind::Review { prompt }) => {
208 let label = prompt.chars().take(80).collect::<String>();
209 ("review".to_string(), Some(label))
210 }
211 Ok(OrchaNodeKind::Plan { task }) => {
212 let label = task.chars().take(80).collect::<String>();
213 ("plan".to_string(), Some(label))
214 }
215 Err(_) => ("task".to_string(), None),
216 }
217 }
218 NodeSpec::Gather { .. } => ("gather".to_string(), None),
219 NodeSpec::Scatter { .. } => ("scatter".to_string(), None),
220 NodeSpec::SubGraph { .. } => ("subgraph".to_string(), None),
221 }
222}
223
224#[plexus_macros::activation(namespace = "pm",
227version = "1.0.0",
228description = "Project management view of orcha graph execution in ticket vocabulary", crate_path = "plexus_core")]
229impl Pm {
230 #[plexus_macros::method(params(
232 graph_id = "The lattice graph ID returned by build_tickets or run_tickets",
233 recursive = "Optional: when true, include child_graph_id from completed node outputs (default false)"
234 ))]
235 async fn graph_status(
236 &self,
237 graph_id: String,
238 recursive: Option<bool>,
239 ) -> impl Stream<Item = PmGraphStatusResult> + Send + 'static {
240 let pm_storage = self.pm_storage.clone();
241 let lattice_storage = self.lattice_storage.clone();
242
243 stream! {
244 let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
245 Ok(m) => m,
246 Err(e) => { yield PmGraphStatusResult::Err { message: e }; return; }
247 };
248
249 let mut tickets = Vec::new();
250 let mut has_pending = false;
251 let mut has_ready = false;
252 let mut has_running = false;
253 let mut has_failed = false;
254 let mut all_complete = true;
255
256 for (ticket_id, node_id) in &ticket_map {
257 match lattice_storage.get_node(node_id).await {
258 Ok(node) => {
259 match node.status {
260 NodeStatus::Pending => { has_pending = true; all_complete = false; }
261 NodeStatus::Ready => { has_ready = true; all_complete = false; }
262 NodeStatus::Running => { has_running = true; all_complete = false; }
263 NodeStatus::Failed => { has_failed = true; all_complete = false; }
264 NodeStatus::Complete => {}
265 }
266 let (kind, label) = extract_kind_and_label(&node.spec);
267 let child_graph_id = if recursive.unwrap_or(false) && node.status == NodeStatus::Complete {
268 node.output.as_ref().and_then(|o| {
269 if let crate::activations::lattice::NodeOutput::Single(token) = o {
270 if let Some(crate::activations::lattice::TokenPayload::Data { value }) = &token.payload {
271 value.get("child_graph_id").and_then(|v| v.as_str()).map(|s| s.to_string())
272 } else { None }
273 } else { None }
274 })
275 } else {
276 None
277 };
278 tickets.push(PmTicketStatus {
279 ticket_id: ticket_id.clone(),
280 node_id: node_id.clone(),
281 status: node_status_str(&node.status).to_string(),
282 kind,
283 label,
284 child_graph_id,
285 });
286 }
287 Err(e) => {
288 yield PmGraphStatusResult::Err {
289 message: format!("Failed to get node {}: {}", node_id, e),
290 };
291 return;
292 }
293 }
294 }
295
296 let graph_status = if has_failed {
297 "failed"
298 } else if has_running || has_ready {
299 "running"
300 } else if has_pending {
301 "pending"
302 } else if all_complete && !ticket_map.is_empty() {
303 "complete"
304 } else {
305 "pending"
306 };
307
308 yield PmGraphStatusResult::Ok {
309 graph_id,
310 graph_status: graph_status.to_string(),
311 tickets,
312 };
313 }
314 }
315
316 #[plexus_macros::method(params(
318 graph_id = "The lattice graph ID returned by build_tickets or run_tickets"
319 ))]
320 async fn what_next(
321 &self,
322 graph_id: String,
323 ) -> impl Stream<Item = PmWhatNextResult> + Send + 'static {
324 let pm_storage = self.pm_storage.clone();
325 let lattice_storage = self.lattice_storage.clone();
326
327 stream! {
328 let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
329 Ok(m) => m,
330 Err(e) => { yield PmWhatNextResult::Err { message: e }; return; }
331 };
332
333 let mut tickets = Vec::new();
334 for (ticket_id, node_id) in &ticket_map {
335 match lattice_storage.get_node(node_id).await {
336 Ok(node) => {
337 if matches!(node.status, NodeStatus::Ready | NodeStatus::Running) {
338 let (kind, label) = extract_kind_and_label(&node.spec);
339 tickets.push(PmTicketStatus {
340 ticket_id: ticket_id.clone(),
341 node_id: node_id.clone(),
342 status: node_status_str(&node.status).to_string(),
343 kind,
344 label,
345 child_graph_id: None,
346 });
347 }
348 }
349 Err(e) => {
350 yield PmWhatNextResult::Err {
351 message: format!("Failed to get node {}: {}", node_id, e),
352 };
353 return;
354 }
355 }
356 }
357
358 yield PmWhatNextResult::Ok { graph_id, tickets };
359 }
360 }
361
362 #[plexus_macros::method(params(
364 graph_id = "The lattice graph ID returned by build_tickets or run_tickets",
365 ticket_id = "The ticket ID (as used in the ticket file)"
366 ))]
367 async fn inspect_ticket(
368 &self,
369 graph_id: String,
370 ticket_id: String,
371 ) -> impl Stream<Item = PmInspectResult> + Send + 'static {
372 let pm_storage = self.pm_storage.clone();
373 let lattice_storage = self.lattice_storage.clone();
374
375 stream! {
376 let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
377 Ok(m) => m,
378 Err(e) => { yield PmInspectResult::Err { message: e }; return; }
379 };
380
381 let node_id = match ticket_map.get(&ticket_id) {
382 Some(id) => id.clone(),
383 None => { yield PmInspectResult::NotFound { ticket_id }; return; }
384 };
385
386 let node = match lattice_storage.get_node(&node_id).await {
387 Ok(n) => n,
388 Err(e) => {
389 yield PmInspectResult::Err {
390 message: format!("Failed to get node: {}", e),
391 };
392 return;
393 }
394 };
395
396 let status = node_status_str(&node.status).to_string();
397 let output = node.output.as_ref()
398 .map(|o| serde_json::to_value(o).unwrap_or(Value::Null));
399 let error = node.error.clone();
400
401 let child_graph_id = output.as_ref()
402 .and_then(|o| o.get("payload"))
403 .and_then(|p| p.get("value"))
404 .and_then(|v| v.get("child_graph_id"))
405 .and_then(|id| id.as_str())
406 .map(|s| s.to_string());
407
408 match &node.spec {
409 NodeSpec::Task { data, .. } => {
410 match serde_json::from_value::<OrchaNodeKind>(data.clone()) {
411 Ok(OrchaNodeKind::Task { task, .. }) => {
412 yield PmInspectResult::Ok {
413 ticket_id, node_id, status,
414 kind: "task".to_string(),
415 task: Some(task), command: None, output, error,
416 child_graph_id,
417 };
418 }
419 Ok(OrchaNodeKind::Synthesize { task, .. }) => {
420 yield PmInspectResult::Ok {
421 ticket_id, node_id, status,
422 kind: "synthesize".to_string(),
423 task: Some(task), command: None, output, error,
424 child_graph_id,
425 };
426 }
427 Ok(OrchaNodeKind::Validate { command, .. }) => {
428 yield PmInspectResult::Ok {
429 ticket_id, node_id, status,
430 kind: "validate".to_string(),
431 task: None, command: Some(command), output, error,
432 child_graph_id,
433 };
434 }
435 Ok(OrchaNodeKind::Review { prompt }) => {
436 yield PmInspectResult::Ok {
437 ticket_id, node_id, status,
438 kind: "review".to_string(),
439 task: Some(prompt), command: None, output, error,
440 child_graph_id,
441 };
442 }
443 Ok(OrchaNodeKind::Plan { task }) => {
444 yield PmInspectResult::Ok {
445 ticket_id, node_id, status,
446 kind: "plan".to_string(),
447 task: Some(task), command: None, output, error,
448 child_graph_id,
449 };
450 }
451 Err(_) => {
452 yield PmInspectResult::Ok {
453 ticket_id, node_id, status,
454 kind: "task".to_string(),
455 task: None, command: None, output, error,
456 child_graph_id,
457 };
458 }
459 }
460 }
461 NodeSpec::Gather { .. } => {
462 yield PmInspectResult::Ok {
463 ticket_id, node_id, status,
464 kind: "gather".to_string(),
465 task: None, command: None, output, error,
466 child_graph_id,
467 };
468 }
469 _ => {
470 yield PmInspectResult::Ok {
471 ticket_id, node_id, status,
472 kind: "other".to_string(),
473 task: None, command: None, output, error,
474 child_graph_id,
475 };
476 }
477 }
478 }
479 }
480
481 #[plexus_macros::method(params(
483 graph_id = "The lattice graph ID returned by build_tickets or run_tickets",
484 ticket_id = "The ticket ID to investigate"
485 ))]
486 async fn why_blocked(
487 &self,
488 graph_id: String,
489 ticket_id: String,
490 ) -> impl Stream<Item = PmWhyBlockedResult> + Send + 'static {
491 let pm_storage = self.pm_storage.clone();
492 let lattice_storage = self.lattice_storage.clone();
493
494 stream! {
495 let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
496 Ok(m) => m,
497 Err(e) => { yield PmWhyBlockedResult::Err { message: e }; return; }
498 };
499
500 let node_id = match ticket_map.get(&ticket_id) {
501 Some(id) => id.clone(),
502 None => {
503 yield PmWhyBlockedResult::Err {
504 message: format!("Ticket not found: {}", ticket_id),
505 };
506 return;
507 }
508 };
509
510 let predecessors = match lattice_storage.get_inbound_edges(&node_id).await {
511 Ok(p) => p,
512 Err(e) => {
513 yield PmWhyBlockedResult::Err {
514 message: format!("Failed to get predecessors: {}", e),
515 };
516 return;
517 }
518 };
519
520 let mut blocked_by = Vec::new();
521 for pred_id in predecessors {
522 let pred_node = match lattice_storage.get_node(&pred_id).await {
523 Ok(n) => n,
524 Err(_) => continue,
525 };
526
527 if pred_node.status == NodeStatus::Complete {
528 continue;
529 }
530
531 let pred_ticket_id = pm_storage
532 .get_ticket_for_node(&graph_id, &pred_id)
533 .await
534 .unwrap_or(None)
535 .unwrap_or_else(|| pred_id.clone());
536
537 let (kind, label) = extract_kind_and_label(&pred_node.spec);
538 blocked_by.push(PmTicketStatus {
539 ticket_id: pred_ticket_id,
540 node_id: pred_id,
541 status: node_status_str(&pred_node.status).to_string(),
542 kind,
543 label,
544 child_graph_id: None,
545 });
546 }
547
548 if blocked_by.is_empty() {
549 yield PmWhyBlockedResult::NotBlocked { ticket_id };
550 } else {
551 yield PmWhyBlockedResult::Ok { ticket_id, blocked_by };
552 }
553 }
554 }
555
556 #[plexus_macros::method(params(
558 graph_id = "The lattice graph ID"
559 ))]
560 async fn get_ticket_source(
561 &self,
562 graph_id: String,
563 ) -> impl Stream<Item = Value> + Send + 'static {
564 let pm_storage = self.pm_storage.clone();
565 stream! {
566 match pm_storage.get_ticket_source(&graph_id).await {
567 Ok(Some(source)) => yield serde_json::json!({ "type": "ok", "source": source }),
568 Ok(None) => yield serde_json::json!({ "type": "not_found", "graph_id": graph_id }),
569 Err(e) => yield serde_json::json!({ "type": "err", "message": e }),
570 }
571 }
572 }
573
574 #[plexus_macros::method(params(
576 project = "Optional: filter by metadata.project string",
577 limit = "Optional: max results (default 20)",
578 root_only = "Optional: when true (default), only return root graphs (no parent); set false to include subgraphs",
579 status = "Optional: filter by graph status (running, complete, failed)"
580 ))]
581 async fn list_graphs(
582 &self,
583 project: Option<String>,
584 limit: Option<usize>,
585 root_only: Option<bool>,
586 status: Option<String>,
587 ) -> impl Stream<Item = PmListGraphsResult> + Send + 'static {
588 let pm_storage = self.pm_storage.clone();
589 let lattice_storage = self.lattice_storage.clone();
590
591 stream! {
592 let limit = limit.unwrap_or(20);
593
594 let entries = match pm_storage.list_ticket_maps(limit).await {
595 Ok(v) => v,
596 Err(e) => {
597 yield PmListGraphsResult::Err { message: e };
598 return;
599 }
600 };
601
602 let mut graphs = Vec::new();
603
604 for (graph_id, created_at) in entries {
605 let lattice_graph = match lattice_storage.get_graph(&graph_id).await {
606 Ok(g) => g,
607 Err(_) => continue,
608 };
609
610 if root_only.unwrap_or(true) && lattice_graph.parent_graph_id.is_some() {
612 continue;
613 }
614
615 if let Some(ref status_filter) = status {
617 if lattice_graph.status.to_string() != *status_filter {
618 continue;
619 }
620 }
621
622 if let Some(ref project_filter) = project {
624 let graph_project = lattice_graph.metadata.get("project")
625 .and_then(|v| v.as_str())
626 .unwrap_or("");
627 if graph_project != project_filter.as_str() {
628 continue;
629 }
630 }
631
632 let ticket_map = match pm_storage.get_ticket_map(&graph_id).await {
633 Ok(m) => m,
634 Err(_) => HashMap::new(),
635 };
636
637 let status = lattice_graph.status.to_string();
638
639 let source = pm_storage.get_ticket_source(&graph_id).await
640 .ok()
641 .flatten()
642 .map(|s: String| {
643 let trimmed = s.trim().to_string();
645 if trimmed.len() > 200 {
646 format!("{}…", &trimmed[..197])
647 } else {
648 trimmed
649 }
650 });
651
652 graphs.push(PmGraphSummary {
653 graph_id,
654 status,
655 metadata: lattice_graph.metadata,
656 ticket_count: ticket_map.len(),
657 created_at,
658 source,
659 });
660 }
661
662 yield PmListGraphsResult::Ok { graphs };
663 }
664 }
665
666 #[plexus_macros::method(params(
674 graph_id = "Graph ID (from GraphStarted event or pm.list_graphs)",
675 node_id = "Lattice node ID (from NodeStarted event or pm.graph_status)"
676 ))]
677 async fn get_node_log(
678 &self,
679 graph_id: String,
680 node_id: String,
681 ) -> impl Stream<Item = Value> + Send + 'static {
682 let pm_storage = self.pm_storage.clone();
683 stream! {
684 match pm_storage.get_node_log(&graph_id, &node_id).await {
685 Ok(entries) => {
686 for entry in entries {
687 let data: Value = serde_json::from_str(&entry.event_data)
688 .unwrap_or(serde_json::json!({ "raw": entry.event_data }));
689 yield serde_json::json!({
690 "seq": entry.seq,
691 "event_type": entry.event_type,
692 "data": data,
693 "created_at": entry.created_at,
694 });
695 }
696 }
697 Err(e) => {
698 yield serde_json::json!({ "type": "err", "message": e });
699 }
700 }
701 }
702 }
703}