1use chrono::{DateTime, Utc};
28pub use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41 pub id: String,
42 pub steps: Vec<WorkflowStep>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub input_schema: Option<serde_json::Value>,
46 #[serde(default)]
48 pub checkpoint: CheckpointStrategy,
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
51 pub entry_points: Vec<EntryPoint>,
52}
53
54pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57 "/start",
58 "/stop",
59 "/disconnect",
60 "/reset",
61 "/new",
62 "/newsession",
63 "/newthread",
64 "/status",
65 "/debug",
66 "/verbose",
67 "/help",
68 "/switch",
69 "/workspace",
70 "/context",
71 "/ctx",
72];
73
74impl WorkflowDefinition {
75 pub fn new(steps: Vec<WorkflowStep>) -> Self {
76 Self {
77 id: uuid::Uuid::new_v4().to_string(),
78 steps,
79 input_schema: None,
80 checkpoint: CheckpointStrategy::default(),
81 entry_points: vec![],
82 }
83 }
84
85 pub fn with_id(mut self, id: &str) -> Self {
86 self.id = id.to_string();
87 self
88 }
89
90 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91 self.checkpoint = strategy;
92 self
93 }
94
95 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96 self.entry_points = entry_points;
97 self
98 }
99
100 pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102 self.entry_points.iter().find(|ep| ep.id == id)
103 }
104
105 pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109 use std::collections::{HashSet, VecDeque};
110
111 let mut reachable = HashSet::new();
112 let mut queue = VecDeque::new();
113 queue.push_back(start_step_id.to_string());
114
115 while let Some(current) = queue.pop_front() {
116 if !reachable.insert(current.clone()) {
117 continue;
118 }
119 for step in &self.steps {
120 if step.depends_on.contains(¤t) && !reachable.contains(&step.id) {
121 queue.push_back(step.id.clone());
122 }
123 }
124 }
125
126 reachable
127 }
128
129 pub fn validate_channel_surface(&self) -> Result<(), String> {
132 use distri_types::WorkflowTrigger;
133 use std::collections::HashSet;
134
135 let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
136 let mut slash_names: HashSet<String> = HashSet::new();
137 let mut callback_ids: HashSet<String> = HashSet::new();
138 let mut message_count = 0usize;
139
140 for ep in &self.entry_points {
141 if !step_ids.contains(ep.starts_at.as_str()) {
142 return Err(format!(
143 "entry point '{}' starts_at unknown step '{}'",
144 ep.id, ep.starts_at
145 ));
146 }
147 for trigger in &ep.triggers {
148 match trigger {
149 WorkflowTrigger::Slash { name, aliases, .. } => {
150 for n in std::iter::once(name).chain(aliases.iter()) {
151 let lower = n.to_lowercase();
152 if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
153 return Err(format!(
154 "slash command '{n}' shadows a built-in command"
155 ));
156 }
157 if !slash_names.insert(lower.clone()) {
158 return Err(format!(
159 "entry point '{}': slash command '{}' is already declared",
160 ep.id, n
161 ));
162 }
163 }
164 }
165 WorkflowTrigger::Callback { id, .. } => {
166 if !callback_ids.insert(id.clone()) {
167 return Err(format!(
168 "entry point '{}': callback id '{}' is already declared",
169 ep.id, id
170 ));
171 }
172 }
173 WorkflowTrigger::Message {} => message_count += 1,
174 _ => {}
177 }
178 }
179 }
180 if message_count > 1 {
181 return Err(format!(
182 "workflow declares {message_count} message catch-all entry \
183 points; at most one is allowed"
184 ));
185 }
186 for step in &self.steps {
187 if let StepKind::Reply {
188 buttons_from,
189 button_template,
190 ..
191 } = &step.kind
192 {
193 if button_template.is_some() != buttons_from.is_some() {
194 return Err(format!(
195 "reply step '{}': button_template and buttons_from \
196 must be set together",
197 step.id
198 ));
199 }
200 }
201 }
202 Ok(())
203 }
204
205 pub fn detect_cycles(&self) -> Result<(), String> {
208 use std::collections::{HashMap, HashSet};
209
210 let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
211 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
212 for step in &self.steps {
213 adj.insert(
214 step.id.as_str(),
215 step.depends_on.iter().map(|s| s.as_str()).collect(),
216 );
217 }
218
219 let mut visited = HashSet::new();
220 let mut in_stack = HashSet::new();
221
222 fn dfs<'a>(
223 node: &'a str,
224 adj: &HashMap<&'a str, Vec<&'a str>>,
225 visited: &mut HashSet<&'a str>,
226 in_stack: &mut HashSet<&'a str>,
227 path: &mut Vec<&'a str>,
228 ) -> Result<(), String> {
229 visited.insert(node);
230 in_stack.insert(node);
231 path.push(node);
232
233 if let Some(deps) = adj.get(node) {
234 for &dep in deps {
235 if !visited.contains(dep) {
236 dfs(dep, adj, visited, in_stack, path)?;
237 } else if in_stack.contains(dep) {
238 let cycle_start = path.iter().position(|&n| n == dep).unwrap();
239 let cycle: Vec<&str> = path[cycle_start..].to_vec();
240 return Err(format!(
241 "Circular dependency detected: {} → {}",
242 cycle.join(" → "),
243 dep
244 ));
245 }
246 }
247 }
248
249 in_stack.remove(node);
250 path.pop();
251 Ok(())
252 }
253
254 let mut path = Vec::new();
255 for step in &self.steps {
256 if !visited.contains(step.id.as_str()) {
257 dfs(
258 step.id.as_str(),
259 &adj,
260 &mut visited,
261 &mut in_stack,
262 &mut path,
263 )?;
264 }
265 }
266
267 for step in &self.steps {
269 for dep in &step.depends_on {
270 if !step_ids.contains(dep.as_str()) {
271 return Err(format!(
272 "Step '{}' depends on '{}' which does not exist",
273 step.id, dep
274 ));
275 }
276 }
277 }
278
279 Ok(())
280 }
281}
282
283fn default_empty_object() -> serde_json::Value {
288 serde_json::json!({})
289}
290
291fn default_now() -> DateTime<Utc> {
292 Utc::now()
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct WorkflowRun {
310 #[serde(flatten)]
311 pub definition: WorkflowDefinition,
312 #[serde(default)]
313 pub status: TaskStatus,
314 #[serde(default)]
315 pub current_step: usize,
316 #[serde(default = "default_empty_object")]
317 pub context: serde_json::Value,
318 #[serde(default)]
319 pub notes: Vec<WorkflowNote>,
320 #[serde(default)]
321 pub step_runs: Vec<WorkflowStepRun>,
322 #[serde(default = "default_now")]
323 pub created_at: DateTime<Utc>,
324 #[serde(default = "default_now")]
325 pub updated_at: DateTime<Utc>,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, Default)]
330pub struct WorkflowStepRun {
331 pub step_id: String,
332 #[serde(default)]
333 pub status: TaskStatus,
334 #[serde(default, skip_serializing_if = "Option::is_none")]
335 pub result: Option<serde_json::Value>,
336 #[serde(default, skip_serializing_if = "Option::is_none")]
337 pub error: Option<String>,
338 #[serde(default, skip_serializing_if = "Option::is_none")]
339 pub started_at: Option<DateTime<Utc>>,
340 #[serde(default, skip_serializing_if = "Option::is_none")]
341 pub completed_at: Option<DateTime<Utc>>,
342}
343
344impl WorkflowRun {
345 pub fn new(definition: WorkflowDefinition) -> Self {
348 let step_runs = definition
349 .steps
350 .iter()
351 .map(|s| WorkflowStepRun {
352 step_id: s.id.clone(),
353 ..Default::default()
354 })
355 .collect();
356 Self {
357 definition,
358 status: TaskStatus::Pending,
359 current_step: 0,
360 context: serde_json::json!({}),
361 notes: vec![],
362 step_runs,
363 created_at: Utc::now(),
364 updated_at: Utc::now(),
365 }
366 }
367
368 pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
371 Self::new(WorkflowDefinition::new(steps))
372 }
373
374 pub fn with_context(mut self, context: serde_json::Value) -> Self {
375 self.context = context;
376 self
377 }
378
379 pub fn with_id(mut self, id: &str) -> Self {
380 self.definition.id = id.to_string();
381 self
382 }
383
384 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
385 self.definition.checkpoint = strategy;
386 self
387 }
388
389 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
390 self.definition.entry_points = entry_points;
391 self
392 }
393
394 pub fn id(&self) -> &str {
395 &self.definition.id
396 }
397
398 pub fn steps(&self) -> &[WorkflowStep] {
399 &self.definition.steps
400 }
401
402 pub fn step(&self, idx: usize) -> &WorkflowStep {
403 &self.definition.steps[idx]
404 }
405
406 pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
407 &self.step_runs[idx]
408 }
409
410 pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
411 &mut self.step_runs[idx]
412 }
413
414 pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
415 self.step_runs.iter().find(|s| s.step_id == step_id)
416 }
417
418 pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
419 self.step_runs.iter_mut().find(|s| s.step_id == step_id)
420 }
421
422 pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
426 let ep = self
427 .definition
428 .entry_points
429 .iter()
430 .find(|ep| ep.id == entry_point_id)
431 .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
432 .clone();
433
434 if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
435 return Err(format!(
436 "Entry point '{}' references step '{}' which does not exist",
437 entry_point_id, ep.starts_at
438 ));
439 }
440
441 let reachable = self.definition.reachable_from(&ep.starts_at);
442
443 for (i, step) in self.definition.steps.iter().enumerate() {
444 if !reachable.contains(&step.id) {
445 self.step_runs[i].status = TaskStatus::Canceled;
446 if let Some(result) = ep.preset_results.get(&step.id) {
447 self.step_runs[i].result = Some(result.clone());
448 }
449 }
450 }
451
452 if let Some(ctx) = self.context.as_object_mut() {
453 let steps = ctx
454 .entry("steps")
455 .or_insert(serde_json::json!({}))
456 .as_object_mut()
457 .expect("steps must be an object");
458 for (step_id, result) in &ep.preset_results {
459 steps.insert(step_id.clone(), result.clone());
460 }
461 }
462
463 Ok(self)
464 }
465
466 pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
470 if let Some(ref schema_value) = self.definition.input_schema {
471 let validator = jsonschema::validator_for(schema_value)
472 .map_err(|e| format!("Invalid input_schema: {e}"))?;
473
474 if !validator.is_valid(&input) {
475 let errors: Vec<String> = validator
476 .iter_errors(&input)
477 .map(|e| format!("{}", e))
478 .collect();
479 return Err(format!("Input validation failed: {}", errors.join("; ")));
480 }
481 }
482
483 if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
484 for (k, v) in inp {
485 ctx.insert(k.clone(), v.clone());
486 }
487 ctx.insert("input".to_string(), input.clone());
488 }
489
490 self.status = TaskStatus::Running;
491 self.updated_at = Utc::now();
492 Ok(self)
493 }
494
495 pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
497 self.step_runs
498 .iter()
499 .enumerate()
500 .find(|(_, s)| s.status == TaskStatus::Pending)
501 .map(|(i, _)| (i, &self.definition.steps[i]))
502 }
503
504 pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
507 let mut runnable = vec![];
508 for (i, step) in self.definition.steps.iter().enumerate() {
509 if self.step_runs[i].status != TaskStatus::Pending {
510 continue;
511 }
512 let deps_met = step.depends_on.iter().all(|dep_id| {
513 self.definition
514 .steps
515 .iter()
516 .zip(self.step_runs.iter())
517 .any(|(s, sr)| {
518 &s.id == dep_id
519 && matches!(sr.status, TaskStatus::Completed | TaskStatus::Canceled)
520 })
521 });
522 if deps_met {
523 runnable.push((i, step));
524 }
525 }
526 runnable
527 }
528
529 pub fn is_complete(&self) -> bool {
530 self.step_runs.iter().all(|s| {
531 matches!(
532 s.status,
533 TaskStatus::Completed | TaskStatus::Canceled | TaskStatus::Failed
534 )
535 })
536 }
537
538 pub fn is_waiting_for_input(&self) -> bool {
539 self.step_runs
540 .iter()
541 .any(|s| s.status == TaskStatus::InputRequired)
542 }
543
544 pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
545 self.step_runs
546 .iter()
547 .enumerate()
548 .find(|(_, s)| s.status == TaskStatus::InputRequired)
549 .map(|(i, _)| (i, &self.definition.steps[i]))
550 }
551
552 pub fn resume_step(
554 &mut self,
555 step_id: &str,
556 result: serde_json::Value,
557 ) -> Result<usize, String> {
558 let idx = self
559 .step_runs
560 .iter()
561 .position(|s| s.step_id == step_id && s.status == TaskStatus::InputRequired)
562 .ok_or_else(|| {
563 format!(
564 "Step '{}' not found or not in waiting_for_input state",
565 step_id
566 )
567 })?;
568
569 self.step_runs[idx].status = TaskStatus::Completed;
570 self.step_runs[idx].result = Some(result.clone());
571 self.step_runs[idx].completed_at = Some(Utc::now());
572
573 if let Some(ctx) = self.context.as_object_mut() {
574 let steps = ctx
575 .entry("steps")
576 .or_insert(serde_json::json!({}))
577 .as_object_mut()
578 .expect("steps must be an object");
579 steps.insert(step_id.to_string(), result);
580 }
581
582 self.status = TaskStatus::Running;
583 self.updated_at = Utc::now();
584 Ok(idx)
585 }
586
587 pub fn is_stuck(&self) -> bool {
589 let has_blocked = self
590 .step_runs
591 .iter()
592 .any(|s| s.status == TaskStatus::Failed);
593 let has_pending = self
594 .step_runs
595 .iter()
596 .any(|s| s.status == TaskStatus::Pending);
597 let has_running = self
598 .step_runs
599 .iter()
600 .any(|s| s.status == TaskStatus::Running);
601
602 if !has_blocked || has_running {
603 return false;
604 }
605
606 if !has_pending {
607 return true;
608 }
609
610 !self
611 .definition
612 .steps
613 .iter()
614 .zip(self.step_runs.iter())
615 .any(|(step, run)| {
616 run.status == TaskStatus::Pending
617 && step.depends_on.iter().all(|dep_id| {
618 self.definition
619 .steps
620 .iter()
621 .zip(self.step_runs.iter())
622 .any(|(s, sr)| {
623 &s.id == dep_id
624 && matches!(
625 sr.status,
626 TaskStatus::Completed
627 | TaskStatus::Pending
628 | TaskStatus::Running
629 )
630 })
631 })
632 })
633 }
634
635 pub fn has_failed(&self) -> bool {
636 self.step_runs
637 .iter()
638 .any(|s| s.status == TaskStatus::Failed)
639 }
640
641 pub fn add_note(&mut self, step_id: &str, message: &str) {
643 self.notes.push(WorkflowNote {
644 step_id: step_id.to_string(),
645 message: message.to_string(),
646 at: Utc::now(),
647 });
648 self.updated_at = Utc::now();
649 }
650
651 pub fn detect_cycles(&self) -> Result<(), String> {
654 self.definition.detect_cycles()
655 }
656}
657
658#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct EntryPoint {
666 pub id: String,
668 pub label: String,
670 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub description: Option<String>,
673 pub starts_at: String,
675 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
678 pub preset_results: HashMap<String, serde_json::Value>,
679 #[serde(default, skip_serializing_if = "Vec::is_empty")]
681 pub required_inputs: Vec<String>,
682 #[serde(default, skip_serializing_if = "Vec::is_empty")]
687 pub triggers: Vec<distri_types::WorkflowTrigger>,
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize)]
697pub struct WorkflowStep {
698 pub id: String,
699 pub label: String,
700 pub kind: StepKind,
701 #[serde(default)]
703 pub depends_on: Vec<String>,
704 #[serde(default)]
706 pub execution: StepExecution,
707 #[serde(default)]
709 pub requires: Vec<StepRequirement>,
710 #[serde(default, skip_serializing_if = "Option::is_none")]
714 pub input: Option<serde_json::Value>,
715 #[serde(default, skip_serializing_if = "Option::is_none")]
719 pub skip_if: Option<String>,
720}
721
722impl WorkflowStep {
723 fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
724 Self {
725 id: id.to_string(),
726 label: label.to_string(),
727 kind,
728 depends_on: vec![],
729 execution: StepExecution::Sequential,
730 requires: vec![],
731 input: None,
732 skip_if: None,
733 }
734 }
735
736 pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
737 Self::new_step(
738 id,
739 label,
740 StepKind::ApiCall {
741 method: method.to_string(),
742 url: url.to_string(),
743 body: None,
744 headers: None,
745 },
746 )
747 }
748
749 pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
750 Self::new_step(
751 id,
752 label,
753 StepKind::AgentRun {
754 agent_id: agent_id.to_string(),
755 prompt: prompt.to_string(),
756 tools: vec![],
757 skills: vec![],
758 model: None,
759 max_iterations: None,
760 },
761 )
762 }
763
764 pub fn script(id: &str, label: &str, command: &str) -> Self {
765 Self::new_step(
766 id,
767 label,
768 StepKind::Script {
769 command: command.to_string(),
770 args: vec![],
771 cwd: None,
772 env: None,
773 timeout_secs: None,
774 output_format: None,
775 shell: None,
776 },
777 )
778 }
779
780 pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
781 Self::new_step(
782 id,
783 label,
784 StepKind::ToolCall {
785 tool_name: tool_name.to_string(),
786 input,
787 agent_id: None,
788 },
789 )
790 }
791
792 pub fn condition(
793 id: &str,
794 label: &str,
795 expression: &str,
796 if_true: StepKind,
797 if_false: Option<StepKind>,
798 ) -> Self {
799 Self::new_step(
800 id,
801 label,
802 StepKind::Condition {
803 expression: expression.to_string(),
804 if_true: Box::new(if_true),
805 if_false: if_false.map(Box::new),
806 },
807 )
808 }
809
810 pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
811 Self::new_step(
812 id,
813 label,
814 StepKind::Checkpoint {
815 message: message.to_string(),
816 },
817 )
818 }
819
820 pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
821 Self::new_step(
822 id,
823 label,
824 StepKind::WaitForInput {
825 message: message.to_string(),
826 schema: None,
827 },
828 )
829 }
830
831 pub fn with_body(mut self, body: serde_json::Value) -> Self {
832 if let StepKind::ApiCall {
833 body: ref mut b, ..
834 } = self.kind
835 {
836 *b = Some(body);
837 }
838 self
839 }
840
841 pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
842 self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
843 self
844 }
845
846 pub fn parallel(mut self) -> Self {
847 self.execution = StepExecution::Parallel;
848 self
849 }
850
851 pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
852 self.requires = requires;
853 self
854 }
855
856 pub fn with_cwd(mut self, cwd: &str) -> Self {
857 if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
858 *c = Some(cwd.to_string());
859 }
860 self
861 }
862
863 pub fn with_timeout(mut self, secs: u64) -> Self {
864 if let StepKind::Script {
865 timeout_secs: ref mut t,
866 ..
867 } = self.kind
868 {
869 *t = Some(secs);
870 }
871 self
872 }
873
874 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
875 if let StepKind::Script { env: ref mut e, .. } = self.kind {
876 *e = Some(env);
877 }
878 self
879 }
880
881 pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
882 self.input = Some(input);
883 self
884 }
885
886 pub fn with_skip_if(mut self, expression: &str) -> Self {
887 self.skip_if = Some(expression.to_string());
888 self
889 }
890}
891
892#[derive(Debug, Clone, Serialize, Deserialize)]
897#[serde(tag = "type", rename_all = "snake_case")]
898pub enum StepKind {
899 ApiCall {
901 method: String,
902 url: String,
903 #[serde(skip_serializing_if = "Option::is_none")]
904 body: Option<serde_json::Value>,
905 #[serde(skip_serializing_if = "Option::is_none")]
906 headers: Option<HashMap<String, String>>,
907 },
908
909 Script {
911 command: String,
912 #[serde(default)]
913 args: Vec<String>,
914 #[serde(default, skip_serializing_if = "Option::is_none")]
915 cwd: Option<String>,
916 #[serde(default, skip_serializing_if = "Option::is_none")]
917 env: Option<HashMap<String, String>>,
918 #[serde(default, skip_serializing_if = "Option::is_none")]
919 timeout_secs: Option<u64>,
920 #[serde(default, skip_serializing_if = "Option::is_none")]
921 output_format: Option<ScriptOutputFormat>,
922 #[serde(default, skip_serializing_if = "Option::is_none")]
923 shell: Option<ShellType>,
924 },
925
926 AgentRun {
928 agent_id: String,
929 prompt: String,
930 #[serde(default)]
931 tools: Vec<String>,
932 #[serde(default)]
934 skills: Vec<String>,
935 #[serde(default, skip_serializing_if = "Option::is_none")]
937 model: Option<String>,
938 #[serde(default, skip_serializing_if = "Option::is_none")]
940 max_iterations: Option<u32>,
941 },
942
943 ToolCall {
945 tool_name: String,
947 input: serde_json::Value,
949 #[serde(default, skip_serializing_if = "Option::is_none")]
951 agent_id: Option<String>,
952 },
953
954 Condition {
956 expression: String,
957 if_true: Box<StepKind>,
958 #[serde(skip_serializing_if = "Option::is_none")]
959 if_false: Option<Box<StepKind>>,
960 },
961
962 Checkpoint { message: String },
964
965 WaitForInput {
969 message: String,
971 #[serde(default, skip_serializing_if = "Option::is_none")]
973 schema: Option<serde_json::Value>,
974 },
975
976 Reply {
982 text: String,
983 #[serde(default, skip_serializing_if = "Vec::is_empty")]
985 buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
986 #[serde(default, skip_serializing_if = "Option::is_none")]
988 buttons_from: Option<String>,
989 #[serde(default, skip_serializing_if = "Option::is_none")]
991 button_template: Option<distri_types::channel_commands::ReplyButtonSpec>,
992 },
993}
994
995#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1004pub struct StepRequirement {
1005 pub skill: String,
1007 #[serde(default)]
1009 pub permissions: Vec<String>,
1010 #[serde(default, skip_serializing_if = "Option::is_none")]
1012 pub config: Option<serde_json::Value>,
1013}
1014
1015impl StepRequirement {
1016 pub fn native(skill: &str) -> Self {
1018 Self {
1019 skill: format!("native:{}", skill),
1020 permissions: vec![],
1021 config: None,
1022 }
1023 }
1024
1025 pub fn connection(provider: &str, service: &str) -> Self {
1027 Self {
1028 skill: format!("{}:{}", provider, service),
1029 permissions: vec![],
1030 config: None,
1031 }
1032 }
1033
1034 pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1035 self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1036 self
1037 }
1038
1039 pub fn namespace(&self) -> Option<&str> {
1041 self.skill.split(':').next()
1042 }
1043
1044 pub fn skill_name(&self) -> Option<&str> {
1046 self.skill.split(':').nth(1)
1047 }
1048
1049 pub fn is_native(&self) -> bool {
1051 self.skill.starts_with("native:")
1052 }
1053
1054 pub fn validate(&self) -> Result<(), String> {
1056 if !self.skill.contains(':') {
1057 return Err(format!(
1058 "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1059 self.skill
1060 ));
1061 }
1062
1063 if self.is_native() {
1064 let known = ["shell", "browser", "network", "agent", "tool"];
1065 if let Some(name) = self.skill_name() {
1066 if !known.contains(&name) {
1067 return Err(format!(
1068 "Unknown native skill '{}'. Known: {:?}",
1069 name, known
1070 ));
1071 }
1072 }
1073 }
1074
1075 Ok(())
1076 }
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize)]
1085#[serde(tag = "type", rename_all = "snake_case")]
1086pub enum CheckpointStrategy {
1087 Internal {
1089 #[serde(default, skip_serializing_if = "Option::is_none")]
1090 ttl_secs: Option<u64>,
1091 },
1092 External { tool_name: String },
1095}
1096
1097impl Default for CheckpointStrategy {
1098 fn default() -> Self {
1099 CheckpointStrategy::Internal { ttl_secs: None }
1100 }
1101}
1102
1103#[derive(Debug, Clone, Serialize, Deserialize)]
1105pub struct CheckpointMeta {
1106 pub checkpoint_id: String,
1107 pub workflow_id: String,
1108 pub step_id: String,
1109 pub created_at: DateTime<Utc>,
1110}
1111
1112#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1126#[serde(rename_all = "snake_case")]
1127pub enum StepExecution {
1128 #[default]
1130 Sequential,
1131 Parallel,
1133}
1134
1135#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1136#[serde(rename_all = "snake_case")]
1137pub enum ScriptOutputFormat {
1138 Text,
1139 Json,
1140 Stream,
1141}
1142
1143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1144#[serde(rename_all = "snake_case")]
1145pub enum ShellType {
1146 Bash,
1147 Sh,
1148 Zsh,
1149}
1150
1151#[derive(Debug, Clone, Serialize, Deserialize)]
1156pub struct StepResult {
1157 pub status: TaskStatus,
1158 pub result: Option<serde_json::Value>,
1159 pub error: Option<String>,
1160 #[serde(skip_serializing_if = "Option::is_none")]
1162 pub context_updates: Option<serde_json::Value>,
1163}
1164
1165impl StepResult {
1166 pub fn done(result: serde_json::Value) -> Self {
1167 Self {
1168 status: TaskStatus::Completed,
1169 result: Some(result),
1170 error: None,
1171 context_updates: None,
1172 }
1173 }
1174
1175 pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1176 Self {
1177 status: TaskStatus::Completed,
1178 result: Some(result),
1179 error: None,
1180 context_updates: Some(updates),
1181 }
1182 }
1183
1184 pub fn failed(error: &str) -> Self {
1185 Self {
1186 status: TaskStatus::Failed,
1187 result: None,
1188 error: Some(error.to_string()),
1189 context_updates: None,
1190 }
1191 }
1192
1193 pub fn skipped() -> Self {
1194 Self {
1195 status: TaskStatus::Canceled,
1196 result: None,
1197 error: None,
1198 context_updates: None,
1199 }
1200 }
1201}
1202
1203#[derive(Debug, Clone, Serialize, Deserialize)]
1208pub struct WorkflowNote {
1209 pub step_id: String,
1210 pub message: String,
1211 pub at: DateTime<Utc>,
1212}
1213
1214#[derive(Debug, Clone, Serialize, Deserialize)]
1225pub struct WorkflowStepSummary {
1226 pub id: String,
1227 pub label: String,
1228 pub status: TaskStatus,
1229 #[serde(default, skip_serializing_if = "Option::is_none")]
1230 pub result: Option<serde_json::Value>,
1231 #[serde(default, skip_serializing_if = "Option::is_none")]
1232 pub error: Option<String>,
1233}
1234
1235#[derive(Debug, Clone, Serialize, Deserialize)]
1239pub struct WorkflowRunSummary {
1240 pub workflow_id: String,
1241 pub status: TaskStatus,
1242 pub steps: Vec<WorkflowStepSummary>,
1243}
1244
1245impl WorkflowRunSummary {
1246 pub fn from_run(run: &WorkflowRun, status: TaskStatus) -> Self {
1251 let steps = run
1252 .steps()
1253 .iter()
1254 .zip(run.step_runs.iter())
1255 .map(|(step, sr)| WorkflowStepSummary {
1256 id: step.id.clone(),
1257 label: step.label.clone(),
1258 status: sr.status.clone(),
1259 result: sr.result.clone(),
1260 error: sr.error.clone(),
1261 })
1262 .collect();
1263 Self {
1264 workflow_id: run.id().to_string(),
1265 status,
1266 steps,
1267 }
1268 }
1269}