1use serde::{Deserialize, Serialize};
2
3pub const DOC_TYPE_INSTANCE: &str = "instance";
8pub const DOC_TYPE_HISTORY: &str = "history";
9pub const DOC_TYPE_ORCH_QUEUE: &str = "orch_queue";
10pub const DOC_TYPE_WORKER_QUEUE: &str = "worker_queue";
11pub const DOC_TYPE_OUTBOX_INTENT: &str = "outbox_intent";
12pub const DOC_TYPE_SESSION: &str = "session";
13pub const DOC_TYPE_KV: &str = "kv";
14pub const DOC_TYPE_KV_DELTA: &str = "kv_delta";
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub struct InstanceDocument {
26 pub id: String,
27 pub instance_id: String,
28 #[serde(rename = "type")]
29 pub doc_type: String,
30
31 pub orchestration_name: String,
32 pub orchestration_version: String,
33 pub current_execution_id: u64,
34 pub status: String,
35 pub output: Option<String>,
36 pub parent_instance_id: Option<String>,
37 pub pinned_duroxide_version_packed: Option<i64>,
39
40 pub custom_status: Option<String>,
41 pub custom_status_version: u64,
42
43 pub lock_token: Option<String>,
44 pub locked_until: Option<u64>,
45
46 pub created_at: u64,
47 pub updated_at: u64,
48
49 #[serde(rename = "_etag", default, skip_serializing)]
51 pub etag: Option<String>,
52 #[serde(rename = "_rid", default, skip_serializing)]
53 pub rid: Option<String>,
54 #[serde(rename = "_self", default, skip_serializing)]
55 pub self_link: Option<String>,
56 #[serde(rename = "_ts", default, skip_serializing)]
57 pub ts: Option<u64>,
58 #[serde(rename = "_attachments", default, skip_serializing)]
59 pub attachments: Option<String>,
60}
61
62impl InstanceDocument {
63 pub fn doc_id(instance_id: &str) -> String {
64 format!("{instance_id}:instance")
65 }
66
67 pub fn new(
68 instance_id: &str,
69 orchestration_name: &str,
70 orchestration_version: &str,
71 execution_id: u64,
72 parent_instance_id: Option<&str>,
73 now_ms: u64,
74 ) -> Self {
75 Self {
76 id: Self::doc_id(instance_id),
77 instance_id: instance_id.to_string(),
78 doc_type: DOC_TYPE_INSTANCE.to_string(),
79 orchestration_name: orchestration_name.to_string(),
80 orchestration_version: orchestration_version.to_string(),
81 current_execution_id: execution_id,
82 status: "Running".to_string(),
83 output: None,
84 parent_instance_id: parent_instance_id.map(|s| s.to_string()),
85 pinned_duroxide_version_packed: None,
86 custom_status: None,
87 custom_status_version: 0,
88 lock_token: None,
89 locked_until: None,
90 created_at: now_ms,
91 updated_at: now_ms,
92 etag: None,
93 rid: None,
94 self_link: None,
95 ts: None,
96 attachments: None,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct HistoryDocument {
111 pub id: String,
112 pub instance_id: String,
113 #[serde(rename = "type")]
114 pub doc_type: String,
115 pub execution_id: u64,
116 pub event_id: u64,
117 pub event_data: String,
118}
119
120impl HistoryDocument {
121 pub fn doc_id(instance_id: &str, execution_id: u64, event_id: u64) -> String {
122 format!("{instance_id}:history:{execution_id}:{event_id}")
123 }
124
125 pub fn new(instance_id: &str, execution_id: u64, event_id: u64, event_data: String) -> Self {
126 Self {
127 id: Self::doc_id(instance_id, execution_id, event_id),
128 instance_id: instance_id.to_string(),
129 doc_type: DOC_TYPE_HISTORY.to_string(),
130 execution_id,
131 event_id,
132 event_data,
133 }
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct QueueItemDocument {
147 pub id: String,
148 pub instance_id: String,
149 #[serde(rename = "type")]
150 pub doc_type: String,
151
152 pub work_item: String,
153 pub dispatch_slot: u8,
154
155 pub visible_at: u64,
156 pub enqueued_at: u64,
157
158 pub lock_token: Option<String>,
159 pub locked_until: Option<u64>,
160 pub attempt_count: i32,
161
162 pub execution_id: Option<u64>,
164 pub activity_id: Option<u64>,
165 pub session_id: Option<String>,
166 #[serde(skip_serializing_if = "Option::is_none")]
168 #[serde(default)]
169 pub tag: Option<String>,
170
171 #[serde(rename = "_etag", default, skip_serializing)]
173 pub etag: Option<String>,
174 #[serde(rename = "_rid", default, skip_serializing)]
175 pub rid: Option<String>,
176 #[serde(rename = "_self", default, skip_serializing)]
177 pub self_link: Option<String>,
178 #[serde(rename = "_ts", default, skip_serializing)]
179 pub ts: Option<u64>,
180 #[serde(rename = "_attachments", default, skip_serializing)]
181 pub attachments: Option<String>,
182}
183
184impl QueueItemDocument {
185 pub fn new_orch_queue(
186 instance_id: &str,
187 work_item_json: String,
188 visible_at: u64,
189 now_ms: u64,
190 ) -> Self {
191 Self {
192 id: uuid::Uuid::new_v4().to_string(),
193 instance_id: instance_id.to_string(),
194 doc_type: DOC_TYPE_ORCH_QUEUE.to_string(),
195 work_item: work_item_json,
196 dispatch_slot: dispatch_slot(instance_id),
197 visible_at,
198 enqueued_at: now_ms,
199 lock_token: None,
200 locked_until: None,
201 attempt_count: 0,
202 execution_id: None,
203 activity_id: None,
204 session_id: None,
205 tag: None,
206 etag: None,
207 rid: None,
208 self_link: None,
209 ts: None,
210 attachments: None,
211 }
212 }
213
214 pub fn new_worker_queue(
215 instance_id: &str,
216 work_item_json: String,
217 execution_id: Option<u64>,
218 activity_id: Option<u64>,
219 session_id: Option<String>,
220 tag: Option<String>,
221 now_ms: u64,
222 ) -> Self {
223 Self {
224 id: uuid::Uuid::new_v4().to_string(),
225 instance_id: instance_id.to_string(),
226 doc_type: DOC_TYPE_WORKER_QUEUE.to_string(),
227 work_item: work_item_json,
228 dispatch_slot: dispatch_slot(instance_id),
229 visible_at: now_ms,
230 enqueued_at: now_ms,
231 lock_token: None,
232 locked_until: None,
233 attempt_count: 0,
234 execution_id,
235 activity_id,
236 session_id,
237 tag,
238 etag: None,
239 rid: None,
240 self_link: None,
241 ts: None,
242 attachments: None,
243 }
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(rename_all = "camelCase")]
256pub struct OutboxIntentDocument {
257 pub id: String,
258 pub instance_id: String,
259 #[serde(rename = "type")]
260 pub doc_type: String,
261
262 pub target_instance_id: String,
263 pub target_document_type: String,
264 pub payload: String,
265 pub idempotency_key: String,
266
267 pub status: String,
268 pub created_at: u64,
269 pub attempt_count: i32,
270 pub last_attempt_at: Option<u64>,
271}
272
273impl OutboxIntentDocument {
274 pub fn new(
275 source_instance_id: &str,
276 target_instance_id: &str,
277 target_doc_type: &str,
278 payload: String,
279 idempotency_key: String,
280 now_ms: u64,
281 ) -> Self {
282 Self {
283 id: format!("intent:{idempotency_key}"),
284 instance_id: source_instance_id.to_string(),
285 doc_type: DOC_TYPE_OUTBOX_INTENT.to_string(),
286 target_instance_id: target_instance_id.to_string(),
287 target_document_type: target_doc_type.to_string(),
288 payload,
289 idempotency_key,
290 status: "pending".to_string(),
291 created_at: now_ms,
292 attempt_count: 0,
293 last_attempt_at: None,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct KeyValueDocument {
308 pub id: String,
309 pub instance_id: String,
310 #[serde(rename = "type")]
311 pub doc_type: String,
312
313 pub key: String,
314 pub value: String,
315 pub execution_id: u64,
316 #[serde(default)]
317 pub last_updated_at_ms: u64,
318
319 #[serde(rename = "_etag", default, skip_serializing)]
320 pub etag: Option<String>,
321 #[serde(rename = "_rid", default, skip_serializing)]
322 pub rid: Option<String>,
323 #[serde(rename = "_self", default, skip_serializing)]
324 pub self_link: Option<String>,
325 #[serde(rename = "_ts", default, skip_serializing)]
326 pub ts: Option<u64>,
327 #[serde(rename = "_attachments", default, skip_serializing)]
328 pub attachments: Option<String>,
329}
330
331impl KeyValueDocument {
332 pub fn doc_id(instance_id: &str, key: &str) -> String {
333 format!("{instance_id}:kv:{key}")
334 }
335
336 pub fn new(
337 instance_id: &str,
338 key: &str,
339 value: &str,
340 execution_id: u64,
341 last_updated_at_ms: u64,
342 ) -> Self {
343 Self {
344 id: Self::doc_id(instance_id, key),
345 instance_id: instance_id.to_string(),
346 doc_type: DOC_TYPE_KV.to_string(),
347 key: key.to_string(),
348 value: value.to_string(),
349 execution_id,
350 last_updated_at_ms,
351 etag: None,
352 rid: None,
353 self_link: None,
354 ts: None,
355 attachments: None,
356 }
357 }
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize)]
364#[serde(rename_all = "camelCase")]
365pub struct KeyValueDeltaDocument {
366 pub id: String,
367 pub instance_id: String,
368 #[serde(rename = "type")]
369 pub doc_type: String,
370
371 pub key: String,
372 #[serde(default)]
373 pub value: Option<String>,
374 pub execution_id: u64,
375 #[serde(default)]
376 pub last_updated_at_ms: u64,
377
378 #[serde(rename = "_etag", default, skip_serializing)]
379 pub etag: Option<String>,
380 #[serde(rename = "_rid", default, skip_serializing)]
381 pub rid: Option<String>,
382 #[serde(rename = "_self", default, skip_serializing)]
383 pub self_link: Option<String>,
384 #[serde(rename = "_ts", default, skip_serializing)]
385 pub ts: Option<u64>,
386 #[serde(rename = "_attachments", default, skip_serializing)]
387 pub attachments: Option<String>,
388}
389
390impl KeyValueDeltaDocument {
391 pub fn doc_id(instance_id: &str, key: &str) -> String {
392 format!("{instance_id}:kv_delta:{key}")
393 }
394
395 pub fn new(
396 instance_id: &str,
397 key: &str,
398 value: Option<String>,
399 execution_id: u64,
400 last_updated_at_ms: u64,
401 ) -> Self {
402 Self {
403 id: Self::doc_id(instance_id, key),
404 instance_id: instance_id.to_string(),
405 doc_type: DOC_TYPE_KV_DELTA.to_string(),
406 key: key.to_string(),
407 value,
408 execution_id,
409 last_updated_at_ms,
410 etag: None,
411 rid: None,
412 self_link: None,
413 ts: None,
414 attachments: None,
415 }
416 }
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
427#[serde(rename_all = "camelCase")]
428pub struct SessionDocument {
429 pub id: String,
430 pub instance_id: String,
431 #[serde(rename = "type")]
432 pub doc_type: String,
433
434 pub session_id: String,
435 pub owner_id: String,
436 pub locked_until: u64,
437 pub last_activity: u64,
438 pub created_at: u64,
439
440 #[serde(rename = "_etag", default, skip_serializing)]
441 pub etag: Option<String>,
442 #[serde(rename = "_rid", default, skip_serializing)]
443 pub rid: Option<String>,
444 #[serde(rename = "_self", default, skip_serializing)]
445 pub self_link: Option<String>,
446 #[serde(rename = "_ts", default, skip_serializing)]
447 pub ts: Option<u64>,
448 #[serde(rename = "_attachments", default, skip_serializing)]
449 pub attachments: Option<String>,
450}
451
452impl SessionDocument {
453 pub fn doc_id(instance_id: &str, session_id: &str) -> String {
454 format!("{instance_id}:session:{session_id}")
455 }
456}
457
458pub fn dispatch_slot(instance_id: &str) -> u8 {
465 use std::hash::{Hash, Hasher};
466 let mut hasher = std::collections::hash_map::DefaultHasher::new();
467 instance_id.hash(&mut hasher);
468 (hasher.finish() % 256) as u8
469}
470
471pub fn pack_semver(v: &semver::Version) -> i64 {
474 (v.major as i64) * 1_000_000 + (v.minor as i64) * 1_000 + (v.patch as i64)
475}
476
477pub fn unpack_semver(packed: i64) -> semver::Version {
479 let major = (packed / 1_000_000) as u64;
480 let minor = ((packed % 1_000_000) / 1_000) as u64;
481 let patch = (packed % 1_000) as u64;
482 semver::Version::new(major, minor, patch)
483}
484
485pub fn now_ms() -> u64 {
487 std::time::SystemTime::now()
488 .duration_since(std::time::UNIX_EPOCH)
489 .unwrap()
490 .as_millis() as u64
491}
492
493pub fn task_id_u64() -> u64 {
496 use std::hash::{Hash, Hasher};
497 let mut hasher = std::collections::hash_map::DefaultHasher::new();
498 if let Some(id) = tokio::task::try_id() {
499 id.hash(&mut hasher);
500 } else {
501 std::thread::current().id().hash(&mut hasher);
503 }
504 hasher.finish()
505}
506
507pub fn work_item_instance(item: &duroxide::providers::WorkItem) -> &str {
509 use duroxide::providers::WorkItem;
510 match item {
511 WorkItem::StartOrchestration { instance, .. } => instance,
512 WorkItem::ActivityExecute { instance, .. } => instance,
513 WorkItem::ActivityCompleted { instance, .. } => instance,
514 WorkItem::ActivityFailed { instance, .. } => instance,
515 WorkItem::TimerFired { instance, .. } => instance,
516 WorkItem::ExternalRaised { instance, .. } => instance,
517 WorkItem::SubOrchCompleted {
518 parent_instance, ..
519 } => parent_instance,
520 WorkItem::SubOrchFailed {
521 parent_instance, ..
522 } => parent_instance,
523 WorkItem::CancelInstance { instance, .. } => instance,
524 WorkItem::ContinueAsNew { instance, .. } => instance,
525 WorkItem::QueueMessage { instance, .. } => instance,
526 }
527}
528
529pub fn idempotency_key(source_instance: &str, execution_id: u64, sequence: u64) -> String {
531 format!("{source_instance}:{execution_id}:{sequence}")
532}