1use chrono::{DateTime, Utc};
28use distri_types::TaskStatus;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct WorkflowDefinition {
41 pub id: String,
42 pub steps: Vec<WorkflowStep>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub input_schema: Option<serde_json::Value>,
46 #[serde(default)]
48 pub checkpoint: CheckpointStrategy,
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
51 pub entry_points: Vec<EntryPoint>,
52}
53
54pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
57 "/start",
58 "/stop",
59 "/disconnect",
60 "/reset",
61 "/new",
62 "/newsession",
63 "/newthread",
64 "/status",
65 "/debug",
66 "/verbose",
67 "/help",
68 "/switch",
69 "/workspace",
70 "/context",
71 "/ctx",
72];
73
74impl WorkflowDefinition {
75 pub fn new(steps: Vec<WorkflowStep>) -> Self {
76 Self {
77 id: uuid::Uuid::new_v4().to_string(),
78 steps,
79 input_schema: None,
80 checkpoint: CheckpointStrategy::default(),
81 entry_points: vec![],
82 }
83 }
84
85 pub fn with_id(mut self, id: &str) -> Self {
86 self.id = id.to_string();
87 self
88 }
89
90 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
91 self.checkpoint = strategy;
92 self
93 }
94
95 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
96 self.entry_points = entry_points;
97 self
98 }
99
100 pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
102 self.entry_points.iter().find(|ep| ep.id == id)
103 }
104
105 pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
109 use std::collections::{HashSet, VecDeque};
110
111 let mut reachable = HashSet::new();
112 let mut queue = VecDeque::new();
113 queue.push_back(start_step_id.to_string());
114
115 while let Some(current) = queue.pop_front() {
116 if !reachable.insert(current.clone()) {
117 continue;
118 }
119 for step in &self.steps {
120 if step.depends_on.contains(¤t) && !reachable.contains(&step.id) {
121 queue.push_back(step.id.clone());
122 }
123 }
124 }
125
126 reachable
127 }
128
129 pub fn validate_channel_surface(&self) -> Result<(), String> {
132 use distri_types::channel_commands::ChannelTrigger;
133 use std::collections::HashSet;
134
135 let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
136 let mut slash_names: HashSet<String> = HashSet::new();
137 let mut callback_ids: HashSet<String> = HashSet::new();
138 let mut message_count = 0usize;
139
140 for ep in &self.entry_points {
141 if !step_ids.contains(ep.starts_at.as_str()) {
142 return Err(format!(
143 "entry point '{}' starts_at unknown step '{}'",
144 ep.id, ep.starts_at
145 ));
146 }
147 let Some(trigger) = &ep.trigger else { continue };
148 match trigger {
149 ChannelTrigger::Slash { name, aliases, .. } => {
150 for n in std::iter::once(name).chain(aliases.iter()) {
151 let lower = n.to_lowercase();
152 if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
153 return Err(format!("slash command '{n}' shadows a built-in command"));
154 }
155 if !slash_names.insert(lower.clone()) {
156 return Err(format!(
157 "entry point '{}': slash command '{}' is already declared",
158 ep.id, n
159 ));
160 }
161 }
162 }
163 ChannelTrigger::Callback { id, .. } => {
164 if !callback_ids.insert(id.clone()) {
167 return Err(format!(
168 "entry point '{}': callback id '{}' is already declared",
169 ep.id, id
170 ));
171 }
172 }
173 ChannelTrigger::Message {} => message_count += 1,
174 }
175 }
176 if message_count > 1 {
177 return Err(format!(
178 "workflow declares {message_count} message catch-all entry \
179 points; at most one is allowed"
180 ));
181 }
182 for step in &self.steps {
183 if let StepKind::Reply {
184 buttons_from,
185 button_template,
186 ..
187 } = &step.kind
188 {
189 if button_template.is_some() != buttons_from.is_some() {
190 return Err(format!(
191 "reply step '{}': button_template and buttons_from \
192 must be set together",
193 step.id
194 ));
195 }
196 }
197 }
198 Ok(())
199 }
200
201 pub fn detect_cycles(&self) -> Result<(), String> {
204 use std::collections::{HashMap, HashSet};
205
206 let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
207 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
208 for step in &self.steps {
209 adj.insert(
210 step.id.as_str(),
211 step.depends_on.iter().map(|s| s.as_str()).collect(),
212 );
213 }
214
215 let mut visited = HashSet::new();
216 let mut in_stack = HashSet::new();
217
218 fn dfs<'a>(
219 node: &'a str,
220 adj: &HashMap<&'a str, Vec<&'a str>>,
221 visited: &mut HashSet<&'a str>,
222 in_stack: &mut HashSet<&'a str>,
223 path: &mut Vec<&'a str>,
224 ) -> Result<(), String> {
225 visited.insert(node);
226 in_stack.insert(node);
227 path.push(node);
228
229 if let Some(deps) = adj.get(node) {
230 for &dep in deps {
231 if !visited.contains(dep) {
232 dfs(dep, adj, visited, in_stack, path)?;
233 } else if in_stack.contains(dep) {
234 let cycle_start = path.iter().position(|&n| n == dep).unwrap();
235 let cycle: Vec<&str> = path[cycle_start..].to_vec();
236 return Err(format!(
237 "Circular dependency detected: {} → {}",
238 cycle.join(" → "),
239 dep
240 ));
241 }
242 }
243 }
244
245 in_stack.remove(node);
246 path.pop();
247 Ok(())
248 }
249
250 let mut path = Vec::new();
251 for step in &self.steps {
252 if !visited.contains(step.id.as_str()) {
253 dfs(
254 step.id.as_str(),
255 &adj,
256 &mut visited,
257 &mut in_stack,
258 &mut path,
259 )?;
260 }
261 }
262
263 for step in &self.steps {
265 for dep in &step.depends_on {
266 if !step_ids.contains(dep.as_str()) {
267 return Err(format!(
268 "Step '{}' depends on '{}' which does not exist",
269 step.id, dep
270 ));
271 }
272 }
273 }
274
275 Ok(())
276 }
277}
278
279fn default_empty_object() -> serde_json::Value {
284 serde_json::json!({})
285}
286
287fn default_now() -> DateTime<Utc> {
288 Utc::now()
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct WorkflowRun {
306 #[serde(flatten)]
307 pub definition: WorkflowDefinition,
308 #[serde(default)]
309 pub status: WorkflowStatus,
310 #[serde(default)]
311 pub current_step: usize,
312 #[serde(default = "default_empty_object")]
313 pub context: serde_json::Value,
314 #[serde(default)]
315 pub notes: Vec<WorkflowNote>,
316 #[serde(default)]
317 pub step_runs: Vec<WorkflowStepRun>,
318 #[serde(default = "default_now")]
319 pub created_at: DateTime<Utc>,
320 #[serde(default = "default_now")]
321 pub updated_at: DateTime<Utc>,
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize, Default)]
326pub struct WorkflowStepRun {
327 pub step_id: String,
328 #[serde(default)]
329 pub status: StepStatus,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
331 pub result: Option<serde_json::Value>,
332 #[serde(default, skip_serializing_if = "Option::is_none")]
333 pub error: Option<String>,
334 #[serde(default, skip_serializing_if = "Option::is_none")]
335 pub started_at: Option<DateTime<Utc>>,
336 #[serde(default, skip_serializing_if = "Option::is_none")]
337 pub completed_at: Option<DateTime<Utc>>,
338}
339
340impl WorkflowRun {
341 pub fn new(definition: WorkflowDefinition) -> Self {
344 let step_runs = definition
345 .steps
346 .iter()
347 .map(|s| WorkflowStepRun {
348 step_id: s.id.clone(),
349 ..Default::default()
350 })
351 .collect();
352 Self {
353 definition,
354 status: WorkflowStatus::Pending,
355 current_step: 0,
356 context: serde_json::json!({}),
357 notes: vec![],
358 step_runs,
359 created_at: Utc::now(),
360 updated_at: Utc::now(),
361 }
362 }
363
364 pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
367 Self::new(WorkflowDefinition::new(steps))
368 }
369
370 pub fn with_context(mut self, context: serde_json::Value) -> Self {
371 self.context = context;
372 self
373 }
374
375 pub fn with_id(mut self, id: &str) -> Self {
376 self.definition.id = id.to_string();
377 self
378 }
379
380 pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
381 self.definition.checkpoint = strategy;
382 self
383 }
384
385 pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
386 self.definition.entry_points = entry_points;
387 self
388 }
389
390 pub fn id(&self) -> &str {
391 &self.definition.id
392 }
393
394 pub fn steps(&self) -> &[WorkflowStep] {
395 &self.definition.steps
396 }
397
398 pub fn step(&self, idx: usize) -> &WorkflowStep {
399 &self.definition.steps[idx]
400 }
401
402 pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
403 &self.step_runs[idx]
404 }
405
406 pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
407 &mut self.step_runs[idx]
408 }
409
410 pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
411 self.step_runs.iter().find(|s| s.step_id == step_id)
412 }
413
414 pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
415 self.step_runs.iter_mut().find(|s| s.step_id == step_id)
416 }
417
418 pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
422 let ep = self
423 .definition
424 .entry_points
425 .iter()
426 .find(|ep| ep.id == entry_point_id)
427 .ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
428 .clone();
429
430 if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
431 return Err(format!(
432 "Entry point '{}' references step '{}' which does not exist",
433 entry_point_id, ep.starts_at
434 ));
435 }
436
437 let reachable = self.definition.reachable_from(&ep.starts_at);
438
439 for (i, step) in self.definition.steps.iter().enumerate() {
440 if !reachable.contains(&step.id) {
441 self.step_runs[i].status = StepStatus::Skipped;
442 if let Some(result) = ep.preset_results.get(&step.id) {
443 self.step_runs[i].result = Some(result.clone());
444 }
445 }
446 }
447
448 if let Some(ctx) = self.context.as_object_mut() {
449 let steps = ctx
450 .entry("steps")
451 .or_insert(serde_json::json!({}))
452 .as_object_mut()
453 .expect("steps must be an object");
454 for (step_id, result) in &ep.preset_results {
455 steps.insert(step_id.clone(), result.clone());
456 }
457 }
458
459 Ok(self)
460 }
461
462 pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
466 if let Some(ref schema_value) = self.definition.input_schema {
467 let validator = jsonschema::validator_for(schema_value)
468 .map_err(|e| format!("Invalid input_schema: {e}"))?;
469
470 if !validator.is_valid(&input) {
471 let errors: Vec<String> = validator
472 .iter_errors(&input)
473 .map(|e| format!("{}", e))
474 .collect();
475 return Err(format!("Input validation failed: {}", errors.join("; ")));
476 }
477 }
478
479 if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
480 for (k, v) in inp {
481 ctx.insert(k.clone(), v.clone());
482 }
483 ctx.insert("input".to_string(), input.clone());
484 }
485
486 self.status = WorkflowStatus::Running;
487 self.updated_at = Utc::now();
488 Ok(self)
489 }
490
491 pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
493 self.step_runs
494 .iter()
495 .enumerate()
496 .find(|(_, s)| s.status == StepStatus::Pending)
497 .map(|(i, _)| (i, &self.definition.steps[i]))
498 }
499
500 pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
503 let mut runnable = vec![];
504 for (i, step) in self.definition.steps.iter().enumerate() {
505 if self.step_runs[i].status != StepStatus::Pending {
506 continue;
507 }
508 let deps_met = step.depends_on.iter().all(|dep_id| {
509 self.definition
510 .steps
511 .iter()
512 .zip(self.step_runs.iter())
513 .any(|(s, sr)| {
514 &s.id == dep_id
515 && matches!(sr.status, StepStatus::Done | StepStatus::Skipped)
516 })
517 });
518 if deps_met {
519 runnable.push((i, step));
520 }
521 }
522 runnable
523 }
524
525 pub fn is_complete(&self) -> bool {
526 self.step_runs.iter().all(|s| {
527 matches!(
528 s.status,
529 StepStatus::Done | StepStatus::Skipped | StepStatus::Blocked
530 )
531 })
532 }
533
534 pub fn is_waiting_for_input(&self) -> bool {
535 self.step_runs
536 .iter()
537 .any(|s| s.status == StepStatus::WaitingForInput)
538 }
539
540 pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
541 self.step_runs
542 .iter()
543 .enumerate()
544 .find(|(_, s)| s.status == StepStatus::WaitingForInput)
545 .map(|(i, _)| (i, &self.definition.steps[i]))
546 }
547
548 pub fn resume_step(
550 &mut self,
551 step_id: &str,
552 result: serde_json::Value,
553 ) -> Result<usize, String> {
554 let idx = self
555 .step_runs
556 .iter()
557 .position(|s| s.step_id == step_id && s.status == StepStatus::WaitingForInput)
558 .ok_or_else(|| {
559 format!(
560 "Step '{}' not found or not in waiting_for_input state",
561 step_id
562 )
563 })?;
564
565 self.step_runs[idx].status = StepStatus::Done;
566 self.step_runs[idx].result = Some(result.clone());
567 self.step_runs[idx].completed_at = Some(Utc::now());
568
569 if let Some(ctx) = self.context.as_object_mut() {
570 let steps = ctx
571 .entry("steps")
572 .or_insert(serde_json::json!({}))
573 .as_object_mut()
574 .expect("steps must be an object");
575 steps.insert(step_id.to_string(), result);
576 }
577
578 self.status = WorkflowStatus::Running;
579 self.updated_at = Utc::now();
580 Ok(idx)
581 }
582
583 pub fn is_stuck(&self) -> bool {
585 let has_blocked = self
586 .step_runs
587 .iter()
588 .any(|s| s.status == StepStatus::Blocked);
589 let has_pending = self
590 .step_runs
591 .iter()
592 .any(|s| s.status == StepStatus::Pending);
593 let has_running = self
594 .step_runs
595 .iter()
596 .any(|s| s.status == StepStatus::Running);
597
598 if !has_blocked || has_running {
599 return false;
600 }
601
602 if !has_pending {
603 return true;
604 }
605
606 !self
607 .definition
608 .steps
609 .iter()
610 .zip(self.step_runs.iter())
611 .any(|(step, run)| {
612 run.status == StepStatus::Pending
613 && step.depends_on.iter().all(|dep_id| {
614 self.definition
615 .steps
616 .iter()
617 .zip(self.step_runs.iter())
618 .any(|(s, sr)| {
619 &s.id == dep_id
620 && matches!(
621 sr.status,
622 StepStatus::Done
623 | StepStatus::Pending
624 | StepStatus::Running
625 )
626 })
627 })
628 })
629 }
630
631 pub fn has_failed(&self) -> bool {
632 self.step_runs
633 .iter()
634 .any(|s| s.status == StepStatus::Failed)
635 }
636
637 pub fn add_note(&mut self, step_id: &str, message: &str) {
639 self.notes.push(WorkflowNote {
640 step_id: step_id.to_string(),
641 message: message.to_string(),
642 at: Utc::now(),
643 });
644 self.updated_at = Utc::now();
645 }
646
647 pub fn detect_cycles(&self) -> Result<(), String> {
650 self.definition.detect_cycles()
651 }
652}
653
654#[derive(Debug, Clone, Serialize, Deserialize)]
661pub struct EntryPoint {
662 pub id: String,
664 pub label: String,
666 #[serde(default, skip_serializing_if = "Option::is_none")]
668 pub description: Option<String>,
669 pub starts_at: String,
671 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
674 pub preset_results: HashMap<String, serde_json::Value>,
675 #[serde(default, skip_serializing_if = "Vec::is_empty")]
677 pub required_inputs: Vec<String>,
678 #[serde(default, skip_serializing_if = "Option::is_none")]
682 pub trigger: Option<distri_types::channel_commands::ChannelTrigger>,
683}
684
685#[derive(Debug, Clone, Serialize, Deserialize)]
692pub struct WorkflowStep {
693 pub id: String,
694 pub label: String,
695 pub kind: StepKind,
696 #[serde(default)]
698 pub depends_on: Vec<String>,
699 #[serde(default)]
701 pub execution: StepExecution,
702 #[serde(default)]
704 pub requires: Vec<StepRequirement>,
705 #[serde(default, skip_serializing_if = "Option::is_none")]
709 pub input: Option<serde_json::Value>,
710 #[serde(default, skip_serializing_if = "Option::is_none")]
714 pub skip_if: Option<String>,
715}
716
717impl WorkflowStep {
718 fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
719 Self {
720 id: id.to_string(),
721 label: label.to_string(),
722 kind,
723 depends_on: vec![],
724 execution: StepExecution::Sequential,
725 requires: vec![],
726 input: None,
727 skip_if: None,
728 }
729 }
730
731 pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
732 Self::new_step(
733 id,
734 label,
735 StepKind::ApiCall {
736 method: method.to_string(),
737 url: url.to_string(),
738 body: None,
739 headers: None,
740 },
741 )
742 }
743
744 pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
745 Self::new_step(
746 id,
747 label,
748 StepKind::AgentRun {
749 agent_id: agent_id.to_string(),
750 prompt: prompt.to_string(),
751 tools: vec![],
752 skills: vec![],
753 model: None,
754 max_iterations: None,
755 },
756 )
757 }
758
759 pub fn script(id: &str, label: &str, command: &str) -> Self {
760 Self::new_step(
761 id,
762 label,
763 StepKind::Script {
764 command: command.to_string(),
765 args: vec![],
766 cwd: None,
767 env: None,
768 timeout_secs: None,
769 output_format: None,
770 shell: None,
771 },
772 )
773 }
774
775 pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
776 Self::new_step(
777 id,
778 label,
779 StepKind::ToolCall {
780 tool_name: tool_name.to_string(),
781 input,
782 agent_id: None,
783 },
784 )
785 }
786
787 pub fn condition(
788 id: &str,
789 label: &str,
790 expression: &str,
791 if_true: StepKind,
792 if_false: Option<StepKind>,
793 ) -> Self {
794 Self::new_step(
795 id,
796 label,
797 StepKind::Condition {
798 expression: expression.to_string(),
799 if_true: Box::new(if_true),
800 if_false: if_false.map(Box::new),
801 },
802 )
803 }
804
805 pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
806 Self::new_step(
807 id,
808 label,
809 StepKind::Checkpoint {
810 message: message.to_string(),
811 },
812 )
813 }
814
815 pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
816 Self::new_step(
817 id,
818 label,
819 StepKind::WaitForInput {
820 message: message.to_string(),
821 schema: None,
822 },
823 )
824 }
825
826 pub fn with_body(mut self, body: serde_json::Value) -> Self {
827 if let StepKind::ApiCall {
828 body: ref mut b, ..
829 } = self.kind
830 {
831 *b = Some(body);
832 }
833 self
834 }
835
836 pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
837 self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
838 self
839 }
840
841 pub fn parallel(mut self) -> Self {
842 self.execution = StepExecution::Parallel;
843 self
844 }
845
846 pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
847 self.requires = requires;
848 self
849 }
850
851 pub fn with_cwd(mut self, cwd: &str) -> Self {
852 if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
853 *c = Some(cwd.to_string());
854 }
855 self
856 }
857
858 pub fn with_timeout(mut self, secs: u64) -> Self {
859 if let StepKind::Script {
860 timeout_secs: ref mut t,
861 ..
862 } = self.kind
863 {
864 *t = Some(secs);
865 }
866 self
867 }
868
869 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
870 if let StepKind::Script { env: ref mut e, .. } = self.kind {
871 *e = Some(env);
872 }
873 self
874 }
875
876 pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
877 self.input = Some(input);
878 self
879 }
880
881 pub fn with_skip_if(mut self, expression: &str) -> Self {
882 self.skip_if = Some(expression.to_string());
883 self
884 }
885}
886
887#[derive(Debug, Clone, Serialize, Deserialize)]
892#[serde(tag = "type", rename_all = "snake_case")]
893pub enum StepKind {
894 ApiCall {
896 method: String,
897 url: String,
898 #[serde(skip_serializing_if = "Option::is_none")]
899 body: Option<serde_json::Value>,
900 #[serde(skip_serializing_if = "Option::is_none")]
901 headers: Option<HashMap<String, String>>,
902 },
903
904 Script {
906 command: String,
907 #[serde(default)]
908 args: Vec<String>,
909 #[serde(default, skip_serializing_if = "Option::is_none")]
910 cwd: Option<String>,
911 #[serde(default, skip_serializing_if = "Option::is_none")]
912 env: Option<HashMap<String, String>>,
913 #[serde(default, skip_serializing_if = "Option::is_none")]
914 timeout_secs: Option<u64>,
915 #[serde(default, skip_serializing_if = "Option::is_none")]
916 output_format: Option<ScriptOutputFormat>,
917 #[serde(default, skip_serializing_if = "Option::is_none")]
918 shell: Option<ShellType>,
919 },
920
921 AgentRun {
923 agent_id: String,
924 prompt: String,
925 #[serde(default)]
926 tools: Vec<String>,
927 #[serde(default)]
929 skills: Vec<String>,
930 #[serde(default, skip_serializing_if = "Option::is_none")]
932 model: Option<String>,
933 #[serde(default, skip_serializing_if = "Option::is_none")]
935 max_iterations: Option<u32>,
936 },
937
938 ToolCall {
940 tool_name: String,
942 input: serde_json::Value,
944 #[serde(default, skip_serializing_if = "Option::is_none")]
946 agent_id: Option<String>,
947 },
948
949 Condition {
951 expression: String,
952 if_true: Box<StepKind>,
953 #[serde(skip_serializing_if = "Option::is_none")]
954 if_false: Option<Box<StepKind>>,
955 },
956
957 Checkpoint { message: String },
959
960 WaitForInput {
964 message: String,
966 #[serde(default, skip_serializing_if = "Option::is_none")]
968 schema: Option<serde_json::Value>,
969 },
970
971 Reply {
977 text: String,
978 #[serde(default, skip_serializing_if = "Vec::is_empty")]
980 buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
981 #[serde(default, skip_serializing_if = "Option::is_none")]
983 buttons_from: Option<String>,
984 #[serde(default, skip_serializing_if = "Option::is_none")]
986 button_template: Option<distri_types::channel_commands::ReplyButtonSpec>,
987 },
988}
989
990#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
999pub struct StepRequirement {
1000 pub skill: String,
1002 #[serde(default)]
1004 pub permissions: Vec<String>,
1005 #[serde(default, skip_serializing_if = "Option::is_none")]
1007 pub config: Option<serde_json::Value>,
1008}
1009
1010impl StepRequirement {
1011 pub fn native(skill: &str) -> Self {
1013 Self {
1014 skill: format!("native:{}", skill),
1015 permissions: vec![],
1016 config: None,
1017 }
1018 }
1019
1020 pub fn connection(provider: &str, service: &str) -> Self {
1022 Self {
1023 skill: format!("{}:{}", provider, service),
1024 permissions: vec![],
1025 config: None,
1026 }
1027 }
1028
1029 pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
1030 self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
1031 self
1032 }
1033
1034 pub fn namespace(&self) -> Option<&str> {
1036 self.skill.split(':').next()
1037 }
1038
1039 pub fn skill_name(&self) -> Option<&str> {
1041 self.skill.split(':').nth(1)
1042 }
1043
1044 pub fn is_native(&self) -> bool {
1046 self.skill.starts_with("native:")
1047 }
1048
1049 pub fn validate(&self) -> Result<(), String> {
1051 if !self.skill.contains(':') {
1052 return Err(format!(
1053 "Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
1054 self.skill
1055 ));
1056 }
1057
1058 if self.is_native() {
1059 let known = ["shell", "browser", "network", "agent", "tool"];
1060 if let Some(name) = self.skill_name() {
1061 if !known.contains(&name) {
1062 return Err(format!(
1063 "Unknown native skill '{}'. Known: {:?}",
1064 name, known
1065 ));
1066 }
1067 }
1068 }
1069
1070 Ok(())
1071 }
1072}
1073
1074#[derive(Debug, Clone, Serialize, Deserialize)]
1080#[serde(tag = "type", rename_all = "snake_case")]
1081pub enum CheckpointStrategy {
1082 Internal {
1084 #[serde(default, skip_serializing_if = "Option::is_none")]
1085 ttl_secs: Option<u64>,
1086 },
1087 External { tool_name: String },
1090}
1091
1092impl Default for CheckpointStrategy {
1093 fn default() -> Self {
1094 CheckpointStrategy::Internal { ttl_secs: None }
1095 }
1096}
1097
1098#[derive(Debug, Clone, Serialize, Deserialize)]
1100pub struct CheckpointMeta {
1101 pub checkpoint_id: String,
1102 pub workflow_id: String,
1103 pub step_id: String,
1104 pub created_at: DateTime<Utc>,
1105}
1106
1107#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1114#[serde(rename_all = "snake_case")]
1115pub enum WorkflowStatus {
1116 #[default]
1117 Pending,
1118 Running,
1119 Paused,
1121 Completed,
1122 Failed,
1123 Blocked,
1125}
1126
1127impl From<WorkflowStatus> for TaskStatus {
1128 fn from(s: WorkflowStatus) -> Self {
1129 match s {
1130 WorkflowStatus::Pending => TaskStatus::Pending,
1131 WorkflowStatus::Running => TaskStatus::Running,
1132 WorkflowStatus::Paused => TaskStatus::InputRequired,
1133 WorkflowStatus::Completed => TaskStatus::Completed,
1134 WorkflowStatus::Failed => TaskStatus::Failed,
1135 WorkflowStatus::Blocked => TaskStatus::Failed,
1138 }
1139 }
1140}
1141
1142#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1147#[serde(rename_all = "snake_case")]
1148pub enum StepStatus {
1149 #[default]
1150 Pending,
1151 Blocked,
1153 Running,
1154 Done,
1155 Failed,
1156 Skipped,
1157 WaitingForInput,
1159}
1160
1161impl From<StepStatus> for TaskStatus {
1162 fn from(s: StepStatus) -> Self {
1163 match s {
1164 StepStatus::Pending => TaskStatus::Pending,
1165 StepStatus::Blocked => TaskStatus::Failed,
1168 StepStatus::Running => TaskStatus::Running,
1169 StepStatus::Done => TaskStatus::Completed,
1170 StepStatus::Failed => TaskStatus::Failed,
1171 StepStatus::Skipped => TaskStatus::Canceled,
1173 StepStatus::WaitingForInput => TaskStatus::InputRequired,
1174 }
1175 }
1176}
1177
1178#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
1179#[serde(rename_all = "snake_case")]
1180pub enum StepExecution {
1181 #[default]
1183 Sequential,
1184 Parallel,
1186}
1187
1188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1189#[serde(rename_all = "snake_case")]
1190pub enum ScriptOutputFormat {
1191 Text,
1192 Json,
1193 Stream,
1194}
1195
1196#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1197#[serde(rename_all = "snake_case")]
1198pub enum ShellType {
1199 Bash,
1200 Sh,
1201 Zsh,
1202}
1203
1204#[derive(Debug, Clone, Serialize, Deserialize)]
1209pub struct StepResult {
1210 pub status: StepStatus,
1211 pub result: Option<serde_json::Value>,
1212 pub error: Option<String>,
1213 #[serde(skip_serializing_if = "Option::is_none")]
1215 pub context_updates: Option<serde_json::Value>,
1216}
1217
1218impl StepResult {
1219 pub fn done(result: serde_json::Value) -> Self {
1220 Self {
1221 status: StepStatus::Done,
1222 result: Some(result),
1223 error: None,
1224 context_updates: None,
1225 }
1226 }
1227
1228 pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
1229 Self {
1230 status: StepStatus::Done,
1231 result: Some(result),
1232 error: None,
1233 context_updates: Some(updates),
1234 }
1235 }
1236
1237 pub fn failed(error: &str) -> Self {
1238 Self {
1239 status: StepStatus::Failed,
1240 result: None,
1241 error: Some(error.to_string()),
1242 context_updates: None,
1243 }
1244 }
1245
1246 pub fn skipped() -> Self {
1247 Self {
1248 status: StepStatus::Skipped,
1249 result: None,
1250 error: None,
1251 context_updates: None,
1252 }
1253 }
1254}
1255
1256#[derive(Debug, Clone, Serialize, Deserialize)]
1261pub struct WorkflowNote {
1262 pub step_id: String,
1263 pub message: String,
1264 pub at: DateTime<Utc>,
1265}
1266
1267#[derive(Debug, Clone, Serialize, Deserialize)]
1278pub struct WorkflowStepSummary {
1279 pub id: String,
1280 pub label: String,
1281 pub status: TaskStatus,
1282 #[serde(default, skip_serializing_if = "Option::is_none")]
1283 pub result: Option<serde_json::Value>,
1284 #[serde(default, skip_serializing_if = "Option::is_none")]
1285 pub error: Option<String>,
1286}
1287
1288#[derive(Debug, Clone, Serialize, Deserialize)]
1292pub struct WorkflowRunSummary {
1293 pub workflow_id: String,
1294 pub status: TaskStatus,
1295 pub steps: Vec<WorkflowStepSummary>,
1296}
1297
1298impl WorkflowRunSummary {
1299 pub fn from_run(run: &WorkflowRun, status: WorkflowStatus) -> Self {
1304 let steps = run
1305 .steps()
1306 .iter()
1307 .zip(run.step_runs.iter())
1308 .map(|(step, sr)| WorkflowStepSummary {
1309 id: step.id.clone(),
1310 label: step.label.clone(),
1311 status: sr.status.into(),
1312 result: sr.result.clone(),
1313 error: sr.error.clone(),
1314 })
1315 .collect();
1316 Self {
1317 workflow_id: run.id().to_string(),
1318 status: status.into(),
1319 steps,
1320 }
1321 }
1322}
1323
1324#[derive(Debug, Clone, Serialize, Deserialize)]
1330#[serde(tag = "event", rename_all = "snake_case")]
1331pub enum WorkflowEvent {
1332 WorkflowStarted {
1334 workflow_id: String,
1335 total_steps: usize,
1336 },
1337 StepStarted {
1339 workflow_id: String,
1340 step_id: String,
1341 step_label: String,
1342 },
1343 StepCompleted {
1345 workflow_id: String,
1346 step_id: String,
1347 step_label: String,
1348 result: Option<serde_json::Value>,
1349 },
1350 StepFailed {
1352 workflow_id: String,
1353 step_id: String,
1354 step_label: String,
1355 error: String,
1356 },
1357 StepWaiting {
1359 workflow_id: String,
1360 step_id: String,
1361 step_label: String,
1362 message: String,
1363 schema: Option<serde_json::Value>,
1364 },
1365 WorkflowCompleted {
1367 workflow_id: String,
1368 status: WorkflowStatus,
1369 steps_done: usize,
1370 steps_failed: usize,
1371 },
1372}