runtara_workflow_stdlib/
instance_output.rs

1// Copyright (C) 2025 SyncMyOrders Sp. z o.o.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Instance output handling for workflow binaries.
4//!
5//! Workflows communicate their exit state via output.json file.
6//! Environment reads this file to determine the next action (scheduling wake, marking completed, etc.).
7//!
8//! The output file path is constructed from environment variables:
9//! - DATA_DIR: Base data directory (defaults to current dir)
10//! - RUNTARA_TENANT_ID: Tenant identifier
11//! - RUNTARA_INSTANCE_ID: Instance identifier
12//!
13//! Output path: $DATA_DIR/$TENANT_ID/runs/$INSTANCE_ID/output.json
14
15use serde::{Deserialize, Serialize};
16use std::path::PathBuf;
17
18/// Instance output status.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum InstanceOutputStatus {
22    /// Instance completed successfully
23    Completed,
24    /// Instance failed with an error
25    Failed,
26    /// Instance suspended (paused via signal)
27    Suspended,
28    /// Instance is sleeping (durable sleep requested)
29    Sleeping,
30    /// Instance was cancelled
31    Cancelled,
32}
33
34/// Instance output written to output.json on exit.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct InstanceOutput {
37    /// The status/reason for exit
38    pub status: InstanceOutputStatus,
39
40    /// Result data (for completed status)
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub result: Option<serde_json::Value>,
43
44    /// Error message (for failed status)
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub error: Option<String>,
47
48    /// Checkpoint ID to resume from (for suspended/sleeping status)
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub checkpoint_id: Option<String>,
51
52    /// Wake delay in milliseconds (for sleeping status)
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub wake_after_ms: Option<u64>,
55}
56
57impl InstanceOutput {
58    /// Create a completed output.
59    pub fn completed(result: serde_json::Value) -> Self {
60        Self {
61            status: InstanceOutputStatus::Completed,
62            result: Some(result),
63            error: None,
64            checkpoint_id: None,
65            wake_after_ms: None,
66        }
67    }
68
69    /// Create a failed output.
70    pub fn failed(error: impl Into<String>) -> Self {
71        Self {
72            status: InstanceOutputStatus::Failed,
73            result: None,
74            error: Some(error.into()),
75            checkpoint_id: None,
76            wake_after_ms: None,
77        }
78    }
79
80    /// Create a suspended output.
81    pub fn suspended(checkpoint_id: impl Into<String>) -> Self {
82        Self {
83            status: InstanceOutputStatus::Suspended,
84            result: None,
85            error: None,
86            checkpoint_id: Some(checkpoint_id.into()),
87            wake_after_ms: None,
88        }
89    }
90
91    /// Create a sleeping output.
92    pub fn sleeping(checkpoint_id: impl Into<String>, wake_after_ms: u64) -> Self {
93        Self {
94            status: InstanceOutputStatus::Sleeping,
95            result: None,
96            error: None,
97            checkpoint_id: Some(checkpoint_id.into()),
98            wake_after_ms: Some(wake_after_ms),
99        }
100    }
101
102    /// Create a cancelled output.
103    pub fn cancelled() -> Self {
104        Self {
105            status: InstanceOutputStatus::Cancelled,
106            result: None,
107            error: None,
108            checkpoint_id: None,
109            wake_after_ms: None,
110        }
111    }
112
113    /// Write instance output to the standard output file location.
114    ///
115    /// Path is determined from environment variables:
116    /// - DATA_DIR (defaults to ".")
117    /// - RUNTARA_TENANT_ID
118    /// - RUNTARA_INSTANCE_ID
119    pub fn write_to_output_file(&self) -> std::io::Result<()> {
120        let path = get_output_file_path();
121        self.write_to_file(&path)
122    }
123
124    /// Write instance output to a specific file path.
125    pub fn write_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
126        let content = serde_json::to_string_pretty(self)?;
127        if let Some(parent) = path.parent() {
128            std::fs::create_dir_all(parent)?;
129        }
130        std::fs::write(path, content)
131    }
132}
133
134/// Get the output file path for the current instance from environment variables.
135pub fn get_output_file_path() -> PathBuf {
136    let data_dir = std::env::var("DATA_DIR").unwrap_or_else(|_| ".".to_string());
137    let tenant_id = std::env::var("RUNTARA_TENANT_ID").unwrap_or_else(|_| "default".to_string());
138    let instance_id =
139        std::env::var("RUNTARA_INSTANCE_ID").unwrap_or_else(|_| "unknown".to_string());
140
141    PathBuf::from(data_dir)
142        .join(tenant_id)
143        .join("runs")
144        .join(instance_id)
145        .join("output.json")
146}
147
148/// Convenience function to write completed output.
149pub fn write_completed(result: serde_json::Value) -> std::io::Result<()> {
150    InstanceOutput::completed(result).write_to_output_file()
151}
152
153/// Convenience function to write failed output.
154pub fn write_failed(error: impl Into<String>) -> std::io::Result<()> {
155    InstanceOutput::failed(error).write_to_output_file()
156}
157
158/// Convenience function to write suspended output.
159pub fn write_suspended(checkpoint_id: impl Into<String>) -> std::io::Result<()> {
160    InstanceOutput::suspended(checkpoint_id).write_to_output_file()
161}
162
163/// Convenience function to write sleeping output.
164pub fn write_sleeping(checkpoint_id: impl Into<String>, wake_after_ms: u64) -> std::io::Result<()> {
165    InstanceOutput::sleeping(checkpoint_id, wake_after_ms).write_to_output_file()
166}
167
168/// Convenience function to write cancelled output.
169pub fn write_cancelled() -> std::io::Result<()> {
170    InstanceOutput::cancelled().write_to_output_file()
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn test_serialize_completed() {
179        let output = InstanceOutput::completed(serde_json::json!({"key": "value"}));
180        let json = serde_json::to_string(&output).unwrap();
181        assert!(json.contains("\"status\":\"completed\""));
182        assert!(json.contains("\"result\""));
183    }
184
185    #[test]
186    fn test_serialize_sleeping() {
187        let output = InstanceOutput::sleeping("checkpoint-1", 3600000);
188        let json = serde_json::to_string(&output).unwrap();
189        assert!(json.contains("\"status\":\"sleeping\""));
190        assert!(json.contains("\"checkpoint_id\":\"checkpoint-1\""));
191        assert!(json.contains("\"wake_after_ms\":3600000"));
192    }
193
194    #[test]
195    fn test_serialize_cancelled() {
196        let output = InstanceOutput::cancelled();
197        let json = serde_json::to_string(&output).unwrap();
198        assert!(json.contains("\"status\":\"cancelled\""));
199    }
200}