1use chrono::{DateTime, Utc};
28pub use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41 pub id: String,
42 pub steps: Vec<WorkflowStep>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub input_schema: Option<serde_json::Value>,
46 #[serde(default)]
48 pub checkpoint: CheckpointStrategy,
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
51 pub entry_points: Vec<EntryPoint>,
52}
53
54pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57 "/start",
58 "/stop",
59 "/disconnect",
60 "/reset",
61 "/new",
62 "/newsession",
63 "/newthread",
64 "/status",
65 "/debug",
66 "/verbose",
67 "/help",
68 "/switch",
69 "/workspace",
70 "/context",
71 "/ctx",
72];
73
74impl WorkflowDefinition {
75 pub fn new(steps: Vec<WorkflowStep>) -> Self {
76 Self {
77 id: uuid::Uuid::new_v4().to_string(),
78 steps,
79 input_schema: None,
80 checkpoint: CheckpointStrategy::default(),
81 entry_points: vec![],
82 }
83 }
84
85 pub fn with_id(mut self, id: &str) -> Self {
86 self.id = id.to_string();
87 self
88 }
89
90 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91 self.checkpoint = strategy;
92 self
93 }
94
95 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96 self.entry_points = entry_points;
97 self
98 }
99
100 pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102 self.entry_points.iter().find(|ep| ep.id == id)
103 }
104
105 pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109 use std::collections::{HashSet, VecDeque};
110
111 let mut reachable = HashSet::new();
112 let mut queue = VecDeque::new();
113 queue.push_back(start_step_id.to_string());
114
115 while let Some(current) = queue.pop_front() {
116 if !reachable.insert(current.clone()) {
117 continue;
118 }
119 for step in &self.steps {
120 if step.depends_on.contains(¤t) && !reachable.contains(&step.id) {
121 queue.push_back(step.id.clone());
122 }
123 }
124 }
125
126 reachable
127 }
128
129 pub fn validate_channel_surface(&self) -> Result<(), String> {
132 use distri_types::WorkflowTrigger;
133 use std::collections::HashSet;
134
135 let step_ids: HashSet<&str> =
136 self.steps.iter().map(|s| s.id.as_str()).collect();
137 let mut slash_names: HashSet<String> = HashSet::new();
138 let mut callback_ids: HashSet<String> = HashSet::new();
139 let mut message_count = 0usize;
140
141 for ep in &self.entry_points {
142 if !step_ids.contains(ep.starts_at.as_str()) {
143 return Err(format!(
144 "entry point '{}' starts_at unknown step '{}'",
145 ep.id, ep.starts_at
146 ));
147 }
148 for trigger in &ep.triggers {
149 match trigger {
150 WorkflowTrigger::Slash { name, aliases, .. } => {
151 for n in std::iter::once(name).chain(aliases.iter()) {
152 let lower = n.to_lowercase();
153 if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
154 return Err(format!(
155 "slash command '{n}' shadows a built-in command"
156 ));
157 }
158 if !slash_names.insert(lower.clone()) {
159 return Err(format!(
160 "entry point '{}': slash command '{}' is already declared",
161 ep.id, n
162 ));
163 }
164 }
165 }
166 WorkflowTrigger::Callback { id, .. } => {
167 if !callback_ids.insert(id.clone()) {
168 return Err(format!(
169 "entry point '{}': callback id '{}' is already declared",
170 ep.id, id
171 ));
172 }
173 }
174 WorkflowTrigger::Message {} => message_count += 1,
175 _ => {}
178 }
179 }
180 }
181 if message_count > 1 {
182 return Err(format!(
183 "workflow declares {message_count} message catch-all entry \
184 points; at most one is allowed"
185 ));
186 }
187 for step in &self.steps {
188 if let StepKind::Reply {
189 buttons_from,
190 button_template,
191 ..
192 } = &step.kind
193 {
194 if button_template.is_some() != buttons_from.is_some() {
195 return Err(format!(
196 "reply step '{}': button_template and buttons_from \
197 must be set together",
198 step.id
199 ));
200 }
201 }
202 }
203 Ok(())
204 }
205
206 pub fn detect_cycles(&self) -> Result<(), String> {
209 use std::collections::{HashMap, HashSet};
210
211 let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
212 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
213 for step in &self.steps {
214 adj.insert(
215 step.id.as_str(),
216 step.depends_on.iter().map(|s| s.as_str()).collect(),
217 );
218 }
219
220 let mut visited = HashSet::new();
221 let mut in_stack = HashSet::new();
222
223 fn dfs<'a>(
224 node: &'a str,
225 adj: &HashMap<&'a str, Vec<&'a str>>,
226 visited: &mut HashSet<&'a str>,
227 in_stack: &mut HashSet<&'a str>,
228 path: &mut Vec<&'a str>,
229 ) -> Result<(), String> {
230 visited.insert(node);
231 in_stack.insert(node);
232 path.push(node);
233
234 if let Some(deps) = adj.get(node) {
235 for &dep in deps {
236 if !visited.contains(dep) {
237 dfs(dep, adj, visited, in_stack, path)?;
238 } else if in_stack.contains(dep) {
239 let cycle_start = path.iter().position(|&n| n == dep).unwrap();
240 let cycle: Vec<&str> = path[cycle_start..].to_vec();
241 return Err(format!(
242 "Circular dependency detected: {} → {}",
243 cycle.join(" → "),
244 dep
245 ));
246 }
247 }
248 }
249
250 in_stack.remove(node);
251 path.pop();
252 Ok(())
253 }
254
255 let mut path = Vec::new();
256 for step in &self.steps {
257 if !visited.contains(step.id.as_str()) {
258 dfs(
259 step.id.as_str(),
260 &adj,
261 &mut visited,
262 &mut in_stack,
263 &mut path,
264 )?;
265 }
266 }
267
268 for step in &self.steps {
270 for dep in &step.depends_on {
271 if !step_ids.contains(dep.as_str()) {
272 return Err(format!(
273 "Step '{}' depends on '{}' which does not exist",
274 step.id, dep
275 ));
276 }
277 }
278 }
279
280 Ok(())
281 }
282}
283
284
285fn default_empty_object() -> serde_json::Value {
290 serde_json::json!({})
291}
292
293fn default_now() -> DateTime<Utc> {
294 Utc::now()
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct WorkflowRun {
312 #[serde(flatten)]
313 pub definition: WorkflowDefinition,
314 #[serde(default)]
315 pub status: TaskStatus,
316 #[serde(default)]
317 pub current_step: usize,
318 #[serde(default = "default_empty_object")]
319 pub context: serde_json::Value,
320 #[serde(default)]
321 pub notes: Vec<WorkflowNote>,
322 #[serde(default)]
323 pub step_runs: Vec<WorkflowStepRun>,
324 #[serde(default = "default_now")]
325 pub created_at: DateTime<Utc>,
326 #[serde(default = "default_now")]
327 pub updated_at: DateTime<Utc>,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize, Default)]
332pub struct WorkflowStepRun {
333 pub step_id: String,
334 #[serde(default)]
335 pub status: TaskStatus,
336 #[serde(default, skip_serializing_if = "Option::is_none")]
337 pub result: Option<serde_json::Value>,
338 #[serde(default, skip_serializing_if = "Option::is_none")]
339 pub error: Option<String>,
340 #[serde(default, skip_serializing_if = "Option::is_none")]
341 pub started_at: Option<DateTime<Utc>>,
342 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub completed_at: Option<DateTime<Utc>>,
344}
345
346impl WorkflowRun {
347 pub fn new(definition: WorkflowDefinition) -> Self {
350 let step_runs = definition
351 .steps
352 .iter()
353 .map(|s| WorkflowStepRun {
354 step_id: s.id.clone(),
355 ..Default::default()
356 })
357 .collect();
358 Self {
359 definition,
360 status: TaskStatus::Pending,
361 current_step: 0,
362 context: serde_json::json!({}),
363 notes: vec![],
364 step_runs,
365 created_at: Utc::now(),
366 updated_at: Utc::now(),
367 }
368 }
369
370 pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
373 Self::new(WorkflowDefinition::new(steps))
374 }
375
376 pub fn with_context(mut self, context: serde_json::Value) -> Self {
377 self.context = context;
378 self
379 }
380
381 pub fn with_id(mut self, id: &str) -> Self {
382 self.definition.id = id.to_string();
383 self
384 }
385
386 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
387 self.definition.checkpoint = strategy;
388 self
389 }
390
391 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
392 self.definition.entry_points = entry_points;
393 self
394 }
395
396 pub fn id(&self) -> &str {
397 &self.definition.id
398 }
399
400 pub fn steps(&self) -> &[WorkflowStep] {
401 &self.definition.steps
402 }
403
404 pub fn step(&self, idx: usize) -> &WorkflowStep {
405 &self.definition.steps[idx]
406 }
407
408 pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
409 &self.step_runs[idx]
410 }
411
412 pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
413 &mut self.step_runs[idx]
414 }
415
416 pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
417 self.step_runs.iter().find(|s| s.step_id == step_id)
418 }
419
420 pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
421 self.step_runs.iter_mut().find(|s| s.step_id == step_id)
422 }
423
424 pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
428 let ep = self
429 .definition
430 .entry_points
431 .iter()
432 .find(|ep| ep.id == entry_point_id)
433 .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
434 .clone();
435
436 if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
437 return Err(format!(
438 "Entry point '{}' references step '{}' which does not exist",
439 entry_point_id, ep.starts_at
440 ));
441 }
442
443 let reachable = self.definition.reachable_from(&ep.starts_at);
444
445 for (i, step) in self.definition.steps.iter().enumerate() {
446 if !reachable.contains(&step.id) {
447 self.step_runs[i].status = TaskStatus::Canceled;
448 if let Some(result) = ep.preset_results.get(&step.id) {
449 self.step_runs[i].result = Some(result.clone());
450 }
451 }
452 }
453
454 if let Some(ctx) = self.context.as_object_mut() {
455 let steps = ctx
456 .entry("steps")
457 .or_insert(serde_json::json!({}))
458 .as_object_mut()
459 .expect("steps must be an object");
460 for (step_id, result) in &ep.preset_results {
461 steps.insert(step_id.clone(), result.clone());
462 }
463 }
464
465 Ok(self)
466 }
467
468 pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
472 if let Some(ref schema_value) = self.definition.input_schema {
473 let validator = jsonschema::validator_for(schema_value)
474 .map_err(|e| format!("Invalid input_schema: {e}"))?;
475
476 if !validator.is_valid(&input) {
477 let errors: Vec<String> = validator
478 .iter_errors(&input)
479 .map(|e| format!("{}", e))
480 .collect();
481 return Err(format!("Input validation failed: {}", errors.join("; ")));
482 }
483 }
484
485 if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
486 for (k, v) in inp {
487 ctx.insert(k.clone(), v.clone());
488 }
489 ctx.insert("input".to_string(), input.clone());
490 }
491
492 self.status = TaskStatus::Running;
493 self.updated_at = Utc::now();
494 Ok(self)
495 }
496
497 pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
499 self.step_runs
500 .iter()
501 .enumerate()
502 .find(|(_, s)| s.status == TaskStatus::Pending)
503 .map(|(i, _)| (i, &self.definition.steps[i]))
504 }
505
506 pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
509 let mut runnable = vec![];
510 for (i, step) in self.definition.steps.iter().enumerate() {
511 if self.step_runs[i].status != TaskStatus::Pending {
512 continue;
513 }
514 let deps_met = step.depends_on.iter().all(|dep_id| {
515 self.definition
516 .steps
517 .iter()
518 .zip(self.step_runs.iter())
519 .any(|(s, sr)| {
520 &s.id == dep_id
521 && matches!(sr.status, TaskStatus::Completed | TaskStatus::Canceled)
522 })
523 });
524 if deps_met {
525 runnable.push((i, step));
526 }
527 }
528 runnable
529 }
530
531 pub fn is_complete(&self) -> bool {
532 self.step_runs.iter().all(|s| {
533 matches!(
534 s.status,
535 TaskStatus::Completed | TaskStatus::Canceled | TaskStatus::Failed
536 )
537 })
538 }
539
540 pub fn is_waiting_for_input(&self) -> bool {
541 self.step_runs
542 .iter()
543 .any(|s| s.status == TaskStatus::InputRequired)
544 }
545
546 pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
547 self.step_runs
548 .iter()
549 .enumerate()
550 .find(|(_, s)| s.status == TaskStatus::InputRequired)
551 .map(|(i, _)| (i, &self.definition.steps[i]))
552 }
553
554 pub fn resume_step(
556 &mut self,
557 step_id: &str,
558 result: serde_json::Value,
559 ) -> Result<usize, String> {
560 let idx = self
561 .step_runs
562 .iter()
563 .position(|s| s.step_id == step_id && s.status == TaskStatus::InputRequired)
564 .ok_or_else(|| {
565 format!(
566 "Step '{}' not found or not in waiting_for_input state",
567 step_id
568 )
569 })?;
570
571 self.step_runs[idx].status = TaskStatus::Completed;
572 self.step_runs[idx].result = Some(result.clone());
573 self.step_runs[idx].completed_at = Some(Utc::now());
574
575 if let Some(ctx) = self.context.as_object_mut() {
576 let steps = ctx
577 .entry("steps")
578 .or_insert(serde_json::json!({}))
579 .as_object_mut()
580 .expect("steps must be an object");
581 steps.insert(step_id.to_string(), result);
582 }
583
584 self.status = TaskStatus::Running;
585 self.updated_at = Utc::now();
586 Ok(idx)
587 }
588
589 pub fn is_stuck(&self) -> bool {
591 let has_blocked = self
592 .step_runs
593 .iter()
594 .any(|s| s.status == TaskStatus::Failed);
595 let has_pending = self
596 .step_runs
597 .iter()
598 .any(|s| s.status == TaskStatus::Pending);
599 let has_running = self
600 .step_runs
601 .iter()
602 .any(|s| s.status == TaskStatus::Running);
603
604 if !has_blocked || has_running {
605 return false;
606 }
607
608 if !has_pending {
609 return true;
610 }
611
612 !self
613 .definition
614 .steps
615 .iter()
616 .zip(self.step_runs.iter())
617 .any(|(step, run)| {
618 run.status == TaskStatus::Pending
619 && step.depends_on.iter().all(|dep_id| {
620 self.definition
621 .steps
622 .iter()
623 .zip(self.step_runs.iter())
624 .any(|(s, sr)| {
625 &s.id == dep_id
626 && matches!(
627 sr.status,
628 TaskStatus::Completed
629 | TaskStatus::Pending
630 | TaskStatus::Running
631 )
632 })
633 })
634 })
635 }
636
637 pub fn has_failed(&self) -> bool {
638 self.step_runs
639 .iter()
640 .any(|s| s.status == TaskStatus::Failed)
641 }
642
643 pub fn add_note(&mut self, step_id: &str, message: &str) {
645 self.notes.push(WorkflowNote {
646 step_id: step_id.to_string(),
647 message: message.to_string(),
648 at: Utc::now(),
649 });
650 self.updated_at = Utc::now();
651 }
652
653 pub fn detect_cycles(&self) -> Result<(), String> {
656 self.definition.detect_cycles()
657 }
658}
659
660#[derive(Debug, Clone, Serialize, Deserialize)]
667pub struct EntryPoint {
668 pub id: String,
670 pub label: String,
672 #[serde(default, skip_serializing_if = "Option::is_none")]
674 pub description: Option<String>,
675 pub starts_at: String,
677 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
680 pub preset_results: HashMap<String, serde_json::Value>,
681 #[serde(default, skip_serializing_if = "Vec::is_empty")]
683 pub required_inputs: Vec<String>,
684 #[serde(default, skip_serializing_if = "Vec::is_empty")]
689 pub triggers: Vec<distri_types::WorkflowTrigger>,
690}
691
692#[derive(Debug, Clone, Serialize, Deserialize)]
699pub struct WorkflowStep {
700 pub id: String,
701 pub label: String,
702 pub kind: StepKind,
703 #[serde(default)]
705 pub depends_on: Vec<String>,
706 #[serde(default)]
708 pub execution: StepExecution,
709 #[serde(default)]
711 pub requires: Vec<StepRequirement>,
712 #[serde(default, skip_serializing_if = "Option::is_none")]
716 pub input: Option<serde_json::Value>,
717 #[serde(default, skip_serializing_if = "Option::is_none")]
721 pub skip_if: Option<String>,
722}
723
724impl WorkflowStep {
725 fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
726 Self {
727 id: id.to_string(),
728 label: label.to_string(),
729 kind,
730 depends_on: vec![],
731 execution: StepExecution::Sequential,
732 requires: vec![],
733 input: None,
734 skip_if: None,
735 }
736 }
737
738 pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
739 Self::new_step(
740 id,
741 label,
742 StepKind::ApiCall {
743 method: method.to_string(),
744 url: url.to_string(),
745 body: None,
746 headers: None,
747 },
748 )
749 }
750
751 pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
752 Self::new_step(
753 id,
754 label,
755 StepKind::AgentRun {
756 agent_id: agent_id.to_string(),
757 prompt: prompt.to_string(),
758 tools: vec![],
759 skills: vec![],
760 model: None,
761 max_iterations: None,
762 },
763 )
764 }
765
766 pub fn script(id: &str, label: &str, command: &str) -> Self {
767 Self::new_step(
768 id,
769 label,
770 StepKind::Script {
771 command: command.to_string(),
772 args: vec![],
773 cwd: None,
774 env: None,
775 timeout_secs: None,
776 output_format: None,
777 shell: None,
778 },
779 )
780 }
781
782 pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
783 Self::new_step(
784 id,
785 label,
786 StepKind::ToolCall {
787 tool_name: tool_name.to_string(),
788 input,
789 agent_id: None,
790 },
791 )
792 }
793
794 pub fn condition(
795 id: &str,
796 label: &str,
797 expression: &str,
798 if_true: StepKind,
799 if_false: Option<StepKind>,
800 ) -> Self {
801 Self::new_step(
802 id,
803 label,
804 StepKind::Condition {
805 expression: expression.to_string(),
806 if_true: Box::new(if_true),
807 if_false: if_false.map(Box::new),
808 },
809 )
810 }
811
812 pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
813 Self::new_step(
814 id,
815 label,
816 StepKind::Checkpoint {
817 message: message.to_string(),
818 },
819 )
820 }
821
822 pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
823 Self::new_step(
824 id,
825 label,
826 StepKind::WaitForInput {
827 message: message.to_string(),
828 schema: None,
829 },
830 )
831 }
832
833 pub fn with_body(mut self, body: serde_json::Value) -> Self {
834 if let StepKind::ApiCall {
835 body: ref mut b, ..
836 } = self.kind
837 {
838 *b = Some(body);
839 }
840 self
841 }
842
843 pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
844 self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
845 self
846 }
847
848 pub fn parallel(mut self) -> Self {
849 self.execution = StepExecution::Parallel;
850 self
851 }
852
853 pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
854 self.requires = requires;
855 self
856 }
857
858 pub fn with_cwd(mut self, cwd: &str) -> Self {
859 if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
860 *c = Some(cwd.to_string());
861 }
862 self
863 }
864
865 pub fn with_timeout(mut self, secs: u64) -> Self {
866 if let StepKind::Script {
867 timeout_secs: ref mut t,
868 ..
869 } = self.kind
870 {
871 *t = Some(secs);
872 }
873 self
874 }
875
876 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
877 if let StepKind::Script { env: ref mut e, .. } = self.kind {
878 *e = Some(env);
879 }
880 self
881 }
882
883 pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
884 self.input = Some(input);
885 self
886 }
887
888 pub fn with_skip_if(mut self, expression: &str) -> Self {
889 self.skip_if = Some(expression.to_string());
890 self
891 }
892}
893
894#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(tag = "type", rename_all = "snake_case")]
900pub enum StepKind {
901 ApiCall {
903 method: String,
904 url: String,
905 #[serde(skip_serializing_if = "Option::is_none")]
906 body: Option<serde_json::Value>,
907 #[serde(skip_serializing_if = "Option::is_none")]
908 headers: Option<HashMap<String, String>>,
909 },
910
911 Script {
913 command: String,
914 #[serde(default)]
915 args: Vec<String>,
916 #[serde(default, skip_serializing_if = "Option::is_none")]
917 cwd: Option<String>,
918 #[serde(default, skip_serializing_if = "Option::is_none")]
919 env: Option<HashMap<String, String>>,
920 #[serde(default, skip_serializing_if = "Option::is_none")]
921 timeout_secs: Option<u64>,
922 #[serde(default, skip_serializing_if = "Option::is_none")]
923 output_format: Option<ScriptOutputFormat>,
924 #[serde(default, skip_serializing_if = "Option::is_none")]
925 shell: Option<ShellType>,
926 },
927
928 AgentRun {
930 agent_id: String,
931 prompt: String,
932 #[serde(default)]
933 tools: Vec<String>,
934 #[serde(default)]
936 skills: Vec<String>,
937 #[serde(default, skip_serializing_if = "Option::is_none")]
939 model: Option<String>,
940 #[serde(default, skip_serializing_if = "Option::is_none")]
942 max_iterations: Option<u32>,
943 },
944
945 ToolCall {
947 tool_name: String,
949 input: serde_json::Value,
951 #[serde(default, skip_serializing_if = "Option::is_none")]
953 agent_id: Option<String>,
954 },
955
956 Condition {
958 expression: String,
959 if_true: Box<StepKind>,
960 #[serde(skip_serializing_if = "Option::is_none")]
961 if_false: Option<Box<StepKind>>,
962 },
963
964 Checkpoint { message: String },
966
967 WaitForInput {
971 message: String,
973 #[serde(default, skip_serializing_if = "Option::is_none")]
975 schema: Option<serde_json::Value>,
976 },
977
978 Reply {
984 text: String,
985 #[serde(default, skip_serializing_if = "Vec::is_empty")]
987 buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
988 #[serde(default, skip_serializing_if = "Option::is_none")]
990 buttons_from: Option<String>,
991 #[serde(default, skip_serializing_if = "Option::is_none")]
993 button_template:
994 Option<distri_types::channel_commands::ReplyButtonSpec>,
995 },
996}
997
998#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1007pub struct StepRequirement {
1008 pub skill: String,
1010 #[serde(default)]
1012 pub permissions: Vec<String>,
1013 #[serde(default, skip_serializing_if = "Option::is_none")]
1015 pub config: Option<serde_json::Value>,
1016}
1017
1018impl StepRequirement {
1019 pub fn native(skill: &str) -> Self {
1021 Self {
1022 skill: format!("native:{}", skill),
1023 permissions: vec![],
1024 config: None,
1025 }
1026 }
1027
1028 pub fn connection(provider: &str, service: &str) -> Self {
1030 Self {
1031 skill: format!("{}:{}", provider, service),
1032 permissions: vec![],
1033 config: None,
1034 }
1035 }
1036
1037 pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1038 self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1039 self
1040 }
1041
1042 pub fn namespace(&self) -> Option<&str> {
1044 self.skill.split(':').next()
1045 }
1046
1047 pub fn skill_name(&self) -> Option<&str> {
1049 self.skill.split(':').nth(1)
1050 }
1051
1052 pub fn is_native(&self) -> bool {
1054 self.skill.starts_with("native:")
1055 }
1056
1057 pub fn validate(&self) -> Result<(), String> {
1059 if !self.skill.contains(':') {
1060 return Err(format!(
1061 "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1062 self.skill
1063 ));
1064 }
1065
1066 if self.is_native() {
1067 let known = ["shell", "browser", "network", "agent", "tool"];
1068 if let Some(name) = self.skill_name() {
1069 if !known.contains(&name) {
1070 return Err(format!(
1071 "Unknown native skill '{}'. Known: {:?}",
1072 name, known
1073 ));
1074 }
1075 }
1076 }
1077
1078 Ok(())
1079 }
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1088#[serde(tag = "type", rename_all = "snake_case")]
1089pub enum CheckpointStrategy {
1090 Internal {
1092 #[serde(default, skip_serializing_if = "Option::is_none")]
1093 ttl_secs: Option<u64>,
1094 },
1095 External { tool_name: String },
1098}
1099
1100impl Default for CheckpointStrategy {
1101 fn default() -> Self {
1102 CheckpointStrategy::Internal { ttl_secs: None }
1103 }
1104}
1105
1106#[derive(Debug, Clone, Serialize, Deserialize)]
1108pub struct CheckpointMeta {
1109 pub checkpoint_id: String,
1110 pub workflow_id: String,
1111 pub step_id: String,
1112 pub created_at: DateTime<Utc>,
1113}
1114
1115#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1129#[serde(rename_all = "snake_case")]
1130pub enum StepExecution {
1131 #[default]
1133 Sequential,
1134 Parallel,
1136}
1137
1138#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1139#[serde(rename_all = "snake_case")]
1140pub enum ScriptOutputFormat {
1141 Text,
1142 Json,
1143 Stream,
1144}
1145
1146#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1147#[serde(rename_all = "snake_case")]
1148pub enum ShellType {
1149 Bash,
1150 Sh,
1151 Zsh,
1152}
1153
1154#[derive(Debug, Clone, Serialize, Deserialize)]
1159pub struct StepResult {
1160 pub status: TaskStatus,
1161 pub result: Option<serde_json::Value>,
1162 pub error: Option<String>,
1163 #[serde(skip_serializing_if = "Option::is_none")]
1165 pub context_updates: Option<serde_json::Value>,
1166}
1167
1168impl StepResult {
1169 pub fn done(result: serde_json::Value) -> Self {
1170 Self {
1171 status: TaskStatus::Completed,
1172 result: Some(result),
1173 error: None,
1174 context_updates: None,
1175 }
1176 }
1177
1178 pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1179 Self {
1180 status: TaskStatus::Completed,
1181 result: Some(result),
1182 error: None,
1183 context_updates: Some(updates),
1184 }
1185 }
1186
1187 pub fn failed(error: &str) -> Self {
1188 Self {
1189 status: TaskStatus::Failed,
1190 result: None,
1191 error: Some(error.to_string()),
1192 context_updates: None,
1193 }
1194 }
1195
1196 pub fn skipped() -> Self {
1197 Self {
1198 status: TaskStatus::Canceled,
1199 result: None,
1200 error: None,
1201 context_updates: None,
1202 }
1203 }
1204}
1205
1206#[derive(Debug, Clone, Serialize, Deserialize)]
1211pub struct WorkflowNote {
1212 pub step_id: String,
1213 pub message: String,
1214 pub at: DateTime<Utc>,
1215}
1216
1217#[derive(Debug, Clone, Serialize, Deserialize)]
1228pub struct WorkflowStepSummary {
1229 pub id: String,
1230 pub label: String,
1231 pub status: TaskStatus,
1232 #[serde(default, skip_serializing_if = "Option::is_none")]
1233 pub result: Option<serde_json::Value>,
1234 #[serde(default, skip_serializing_if = "Option::is_none")]
1235 pub error: Option<String>,
1236}
1237
1238#[derive(Debug, Clone, Serialize, Deserialize)]
1242pub struct WorkflowRunSummary {
1243 pub workflow_id: String,
1244 pub status: TaskStatus,
1245 pub steps: Vec<WorkflowStepSummary>,
1246}
1247
1248impl WorkflowRunSummary {
1249 pub fn from_run(run: &WorkflowRun, status: TaskStatus) -> Self {
1254 let steps = run
1255 .steps()
1256 .iter()
1257 .zip(run.step_runs.iter())
1258 .map(|(step, sr)| WorkflowStepSummary {
1259 id: step.id.clone(),
1260 label: step.label.clone(),
1261 status: sr.status.clone(),
1262 result: sr.result.clone(),
1263 error: sr.error.clone(),
1264 })
1265 .collect();
1266 Self {
1267 workflow_id: run.id().to_string(),
1268 status,
1269 steps,
1270 }
1271 }
1272}