1use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use uuid::Uuid;
11
12use crate::id::{FlowRunId, NodeId, StepRunId};
13
14#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
16pub struct Receipt {
17 pub id: ReceiptId,
19 pub flow_run_id: FlowRunId,
21 pub node_id: NodeId,
23 pub step_run_id: StepRunId,
25 pub connector: String,
27 pub timestamp: DateTime<Utc>,
29 pub attempt_id_raw: Uuid,
31 pub input_hash: String,
33 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub output_hash: Option<String>,
36 pub status: ReceiptStatus,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub error: Option<String>,
41 pub duration_ms: u64,
43}
44
45impl Receipt {
46 #[allow(clippy::too_many_arguments)]
49 pub fn new(
50 flow_run_id: FlowRunId,
51 node_id: NodeId,
52 step_run_id: StepRunId,
53 connector: String,
54 timestamp: DateTime<Utc>,
55 attempt_id_raw: Uuid,
56 input_hash: String,
57 output_hash: Option<String>,
58 status: ReceiptStatus,
59 error: Option<String>,
60 duration_ms: u64,
61 ) -> Self {
62 let id = Self::compute_id(
63 &flow_run_id,
64 &node_id,
65 &step_run_id,
66 &connector,
67 ×tamp,
68 &attempt_id_raw,
69 &input_hash,
70 output_hash.as_deref(),
71 &status,
72 error.as_deref(),
73 duration_ms,
74 );
75 Self {
76 id,
77 flow_run_id,
78 node_id,
79 step_run_id,
80 connector,
81 timestamp,
82 attempt_id_raw,
83 input_hash,
84 output_hash,
85 status,
86 error,
87 duration_ms,
88 }
89 }
90
91 #[allow(clippy::too_many_arguments)]
93 fn compute_id(
94 flow_run_id: &FlowRunId,
95 node_id: &NodeId,
96 step_run_id: &StepRunId,
97 connector: &str,
98 timestamp: &DateTime<Utc>,
99 attempt_id_raw: &Uuid,
100 input_hash: &str,
101 output_hash: Option<&str>,
102 status: &ReceiptStatus,
103 error: Option<&str>,
104 duration_ms: u64,
105 ) -> ReceiptId {
106 let mut hasher = Sha256::new();
107 hasher.update(flow_run_id.as_ref().as_bytes());
108 hasher.update(node_id.as_ref().as_bytes());
109 hasher.update(step_run_id.as_ref().as_bytes());
110 hasher.update(connector.as_bytes());
111 hasher.update(timestamp.to_rfc3339().as_bytes());
112 hasher.update(attempt_id_raw.as_bytes());
113 hasher.update(input_hash.as_bytes());
114 match output_hash {
115 Some(h) => {
116 hasher.update(b"\x01");
117 hasher.update(h.as_bytes());
118 }
119 None => hasher.update(b"\x00"),
120 }
121 hasher.update(status.as_str().as_bytes());
122 match error {
123 Some(e) => {
124 hasher.update(b"\x01");
125 hasher.update(e.as_bytes());
126 }
127 None => hasher.update(b"\x00"),
128 }
129 hasher.update(duration_ms.to_le_bytes());
130
131 let hash = hasher.finalize();
132 ReceiptId(format!("{:x}", hash))
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
138#[serde(transparent)]
139pub struct ReceiptId(String);
140
141impl ReceiptId {
142 pub fn as_str(&self) -> &str {
144 &self.0
145 }
146}
147
148impl std::fmt::Display for ReceiptId {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 write!(f, "{}", self.0)
151 }
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub enum ReceiptStatus {
158 Success,
159 Failure,
160 Timeout,
161}
162
163impl ReceiptStatus {
164 fn as_str(&self) -> &'static str {
165 match self {
166 Self::Success => "success",
167 Self::Failure => "failure",
168 Self::Timeout => "timeout",
169 }
170 }
171}
172
173pub fn sha256_hex(data: &[u8]) -> String {
176 let hash = Sha256::digest(data);
177 format!("{:x}", hash)
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::id::{FlowRunId, NodeId, StepRunId};
184
185 fn sample_receipt() -> Receipt {
186 Receipt::new(
187 FlowRunId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap()),
188 NodeId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap()),
189 StepRunId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap()),
190 "http.request".into(),
191 DateTime::parse_from_rfc3339("2026-03-05T12:00:00Z").unwrap().with_timezone(&Utc),
192 Uuid::parse_str("00000000-0000-0000-0000-000000000004").unwrap(),
193 "abc123".into(),
194 Some("def456".into()),
195 ReceiptStatus::Success,
196 None,
197 150,
198 )
199 }
200
201 #[test]
202 fn content_addressable_id_is_deterministic() {
203 let a = sample_receipt();
204 let b = sample_receipt();
205 assert_eq!(a.id, b.id);
206 }
207
208 #[test]
209 fn changing_any_field_changes_id() {
210 let base = sample_receipt();
211
212 let modified = Receipt::new(
214 base.flow_run_id,
215 base.node_id,
216 base.step_run_id,
217 "fs.write".into(),
218 base.timestamp,
219 base.attempt_id_raw,
220 base.input_hash.clone(),
221 base.output_hash.clone(),
222 base.status,
223 base.error.clone(),
224 base.duration_ms,
225 );
226 assert_ne!(base.id, modified.id);
227
228 let modified2 = Receipt::new(
230 base.flow_run_id,
231 base.node_id,
232 base.step_run_id,
233 base.connector.clone(),
234 base.timestamp,
235 base.attempt_id_raw,
236 base.input_hash.clone(),
237 base.output_hash.clone(),
238 base.status,
239 base.error.clone(),
240 999,
241 );
242 assert_ne!(base.id, modified2.id);
243
244 let modified3 = Receipt::new(
246 base.flow_run_id,
247 base.node_id,
248 base.step_run_id,
249 base.connector.clone(),
250 base.timestamp,
251 base.attempt_id_raw,
252 base.input_hash.clone(),
253 base.output_hash.clone(),
254 ReceiptStatus::Failure,
255 Some("boom".into()),
256 base.duration_ms,
257 );
258 assert_ne!(base.id, modified3.id);
259 }
260
261 #[test]
262 fn receipt_json_roundtrip() {
263 let receipt = sample_receipt();
264 let json = serde_json::to_string(&receipt).unwrap();
265 let back: Receipt = serde_json::from_str(&json).unwrap();
266 assert_eq!(receipt, back);
267 }
268
269 #[test]
270 fn receipt_status_variants_roundtrip() {
271 for status in [ReceiptStatus::Success, ReceiptStatus::Failure, ReceiptStatus::Timeout] {
272 let json = serde_json::to_string(&status).unwrap();
273 let back: ReceiptStatus = serde_json::from_str(&json).unwrap();
274 assert_eq!(status, back);
275 }
276 }
277
278 #[test]
279 fn receipt_with_no_output_hash() {
280 let receipt = Receipt::new(
281 FlowRunId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap()),
282 NodeId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap()),
283 StepRunId::from(Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap()),
284 "http.request".into(),
285 Utc::now(),
286 Uuid::new_v4(),
287 "abc".into(),
288 None,
289 ReceiptStatus::Failure,
290 Some("connection refused".into()),
291 0,
292 );
293 let json = serde_json::to_string(&receipt).unwrap();
294 let back: Receipt = serde_json::from_str(&json).unwrap();
295 assert_eq!(receipt, back);
296 assert!(receipt.output_hash.is_none());
297 assert!(receipt.error.is_some());
298 }
299
300 #[test]
301 fn sha256_hex_utility() {
302 let hash = sha256_hex(b"hello world");
303 assert_eq!(hash.len(), 64); assert_eq!(hash, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9");
306 }
307
308 #[test]
309 fn receipt_id_display() {
310 let receipt = sample_receipt();
311 let display = receipt.id.to_string();
312 assert_eq!(display, receipt.id.as_str());
313 assert_eq!(display.len(), 64);
314 }
315}