Skip to main content

duroxide_cdb/
models.rs

1use serde::{Deserialize, Serialize};
2
3// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
4// Document type discriminator
5// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
6
7pub const DOC_TYPE_INSTANCE: &str = "instance";
8pub const DOC_TYPE_HISTORY: &str = "history";
9pub const DOC_TYPE_ORCH_QUEUE: &str = "orch_queue";
10pub const DOC_TYPE_WORKER_QUEUE: &str = "worker_queue";
11pub const DOC_TYPE_OUTBOX_INTENT: &str = "outbox_intent";
12pub const DOC_TYPE_SESSION: &str = "session";
13pub const DOC_TYPE_KV: &str = "kv";
14pub const DOC_TYPE_KV_DELTA: &str = "kv_delta";
15
16// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
17// Instance document
18// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
19
20/// Instance metadata document.
21/// id = "<instanceId>:instance"
22/// Partition key = instanceId
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub struct InstanceDocument {
26    pub id: String,
27    pub instance_id: String,
28    #[serde(rename = "type")]
29    pub doc_type: String,
30
31    pub orchestration_name: String,
32    pub orchestration_version: String,
33    pub current_execution_id: u64,
34    pub status: String,
35    pub output: Option<String>,
36    pub parent_instance_id: Option<String>,
37    /// Packed semver: major*1_000_000 + minor*1_000 + patch
38    pub pinned_duroxide_version_packed: Option<i64>,
39
40    pub custom_status: Option<String>,
41    pub custom_status_version: u64,
42
43    pub lock_token: Option<String>,
44    pub locked_until: Option<u64>,
45
46    pub created_at: u64,
47    pub updated_at: u64,
48
49    /// CosmosDB system field for optimistic concurrency
50    #[serde(rename = "_etag", default, skip_serializing)]
51    pub etag: Option<String>,
52    #[serde(rename = "_rid", default, skip_serializing)]
53    pub rid: Option<String>,
54    #[serde(rename = "_self", default, skip_serializing)]
55    pub self_link: Option<String>,
56    #[serde(rename = "_ts", default, skip_serializing)]
57    pub ts: Option<u64>,
58    #[serde(rename = "_attachments", default, skip_serializing)]
59    pub attachments: Option<String>,
60}
61
62impl InstanceDocument {
63    pub fn doc_id(instance_id: &str) -> String {
64        format!("{instance_id}:instance")
65    }
66
67    pub fn new(
68        instance_id: &str,
69        orchestration_name: &str,
70        orchestration_version: &str,
71        execution_id: u64,
72        parent_instance_id: Option<&str>,
73        now_ms: u64,
74    ) -> Self {
75        Self {
76            id: Self::doc_id(instance_id),
77            instance_id: instance_id.to_string(),
78            doc_type: DOC_TYPE_INSTANCE.to_string(),
79            orchestration_name: orchestration_name.to_string(),
80            orchestration_version: orchestration_version.to_string(),
81            current_execution_id: execution_id,
82            status: "Running".to_string(),
83            output: None,
84            parent_instance_id: parent_instance_id.map(|s| s.to_string()),
85            pinned_duroxide_version_packed: None,
86            custom_status: None,
87            custom_status_version: 0,
88            lock_token: None,
89            locked_until: None,
90            created_at: now_ms,
91            updated_at: now_ms,
92            etag: None,
93            rid: None,
94            self_link: None,
95            ts: None,
96            attachments: None,
97        }
98    }
99}
100
101// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
102// History event document
103// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
104
105/// History event document.
106/// id = "<instanceId>:history:<executionId>:<eventId>"
107/// Partition key = instanceId
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct HistoryDocument {
111    pub id: String,
112    pub instance_id: String,
113    #[serde(rename = "type")]
114    pub doc_type: String,
115    pub execution_id: u64,
116    pub event_id: u64,
117    pub event_data: String,
118}
119
120impl HistoryDocument {
121    pub fn doc_id(instance_id: &str, execution_id: u64, event_id: u64) -> String {
122        format!("{instance_id}:history:{execution_id}:{event_id}")
123    }
124
125    pub fn new(instance_id: &str, execution_id: u64, event_id: u64, event_data: String) -> Self {
126        Self {
127            id: Self::doc_id(instance_id, execution_id, event_id),
128            instance_id: instance_id.to_string(),
129            doc_type: DOC_TYPE_HISTORY.to_string(),
130            execution_id,
131            event_id,
132            event_data,
133        }
134    }
135}
136
137// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
138// Queue item document (shared between orch_queue and worker_queue)
139// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
140
141/// Queue item document (orchestrator or worker).
142/// id = random UUID
143/// Partition key = instanceId
144#[derive(Debug, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct QueueItemDocument {
147    pub id: String,
148    pub instance_id: String,
149    #[serde(rename = "type")]
150    pub doc_type: String,
151
152    pub work_item: String,
153    pub dispatch_slot: u8,
154
155    pub visible_at: u64,
156    pub enqueued_at: u64,
157
158    pub lock_token: Option<String>,
159    pub locked_until: Option<u64>,
160    pub attempt_count: i32,
161
162    /// For worker queue items: execution context
163    pub execution_id: Option<u64>,
164    pub activity_id: Option<u64>,
165    pub session_id: Option<String>,
166    /// Activity routing tag (None = untagged/default)
167    #[serde(skip_serializing_if = "Option::is_none")]
168    #[serde(default)]
169    pub tag: Option<String>,
170
171    /// CosmosDB system fields
172    #[serde(rename = "_etag", default, skip_serializing)]
173    pub etag: Option<String>,
174    #[serde(rename = "_rid", default, skip_serializing)]
175    pub rid: Option<String>,
176    #[serde(rename = "_self", default, skip_serializing)]
177    pub self_link: Option<String>,
178    #[serde(rename = "_ts", default, skip_serializing)]
179    pub ts: Option<u64>,
180    #[serde(rename = "_attachments", default, skip_serializing)]
181    pub attachments: Option<String>,
182}
183
184impl QueueItemDocument {
185    pub fn new_orch_queue(
186        instance_id: &str,
187        work_item_json: String,
188        visible_at: u64,
189        now_ms: u64,
190    ) -> Self {
191        Self {
192            id: uuid::Uuid::new_v4().to_string(),
193            instance_id: instance_id.to_string(),
194            doc_type: DOC_TYPE_ORCH_QUEUE.to_string(),
195            work_item: work_item_json,
196            dispatch_slot: dispatch_slot(instance_id),
197            visible_at,
198            enqueued_at: now_ms,
199            lock_token: None,
200            locked_until: None,
201            attempt_count: 0,
202            execution_id: None,
203            activity_id: None,
204            session_id: None,
205            tag: None,
206            etag: None,
207            rid: None,
208            self_link: None,
209            ts: None,
210            attachments: None,
211        }
212    }
213
214    pub fn new_worker_queue(
215        instance_id: &str,
216        work_item_json: String,
217        execution_id: Option<u64>,
218        activity_id: Option<u64>,
219        session_id: Option<String>,
220        tag: Option<String>,
221        now_ms: u64,
222    ) -> Self {
223        Self {
224            id: uuid::Uuid::new_v4().to_string(),
225            instance_id: instance_id.to_string(),
226            doc_type: DOC_TYPE_WORKER_QUEUE.to_string(),
227            work_item: work_item_json,
228            dispatch_slot: dispatch_slot(instance_id),
229            visible_at: now_ms,
230            enqueued_at: now_ms,
231            lock_token: None,
232            locked_until: None,
233            attempt_count: 0,
234            execution_id,
235            activity_id,
236            session_id,
237            tag,
238            etag: None,
239            rid: None,
240            self_link: None,
241            ts: None,
242            attachments: None,
243        }
244    }
245}
246
247// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
248// Outbox intent document
249// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
250
251/// Outbox intent for cross-partition writes.
252/// id = "intent:<idempotencyKey>"
253/// Partition key = sourceInstanceId
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(rename_all = "camelCase")]
256pub struct OutboxIntentDocument {
257    pub id: String,
258    pub instance_id: String,
259    #[serde(rename = "type")]
260    pub doc_type: String,
261
262    pub target_instance_id: String,
263    pub target_document_type: String,
264    pub payload: String,
265    pub idempotency_key: String,
266
267    pub status: String,
268    pub created_at: u64,
269    pub attempt_count: i32,
270    pub last_attempt_at: Option<u64>,
271}
272
273impl OutboxIntentDocument {
274    pub fn new(
275        source_instance_id: &str,
276        target_instance_id: &str,
277        target_doc_type: &str,
278        payload: String,
279        idempotency_key: String,
280        now_ms: u64,
281    ) -> Self {
282        Self {
283            id: format!("intent:{idempotency_key}"),
284            instance_id: source_instance_id.to_string(),
285            doc_type: DOC_TYPE_OUTBOX_INTENT.to_string(),
286            target_instance_id: target_instance_id.to_string(),
287            target_document_type: target_doc_type.to_string(),
288            payload,
289            idempotency_key,
290            status: "pending".to_string(),
291            created_at: now_ms,
292            attempt_count: 0,
293            last_attempt_at: None,
294        }
295    }
296}
297
298// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
299// KV documents
300// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
301
302/// Materialized KV entry for an orchestration instance.
303/// id = "<instanceId>:kv:<key>"
304/// Partition key = instanceId
305#[derive(Debug, Clone, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct KeyValueDocument {
308    pub id: String,
309    pub instance_id: String,
310    #[serde(rename = "type")]
311    pub doc_type: String,
312
313    pub key: String,
314    pub value: String,
315    pub execution_id: u64,
316    #[serde(default)]
317    pub last_updated_at_ms: u64,
318
319    #[serde(rename = "_etag", default, skip_serializing)]
320    pub etag: Option<String>,
321    #[serde(rename = "_rid", default, skip_serializing)]
322    pub rid: Option<String>,
323    #[serde(rename = "_self", default, skip_serializing)]
324    pub self_link: Option<String>,
325    #[serde(rename = "_ts", default, skip_serializing)]
326    pub ts: Option<u64>,
327    #[serde(rename = "_attachments", default, skip_serializing)]
328    pub attachments: Option<String>,
329}
330
331impl KeyValueDocument {
332    pub fn doc_id(instance_id: &str, key: &str) -> String {
333        format!("{instance_id}:kv:{key}")
334    }
335
336    pub fn new(
337        instance_id: &str,
338        key: &str,
339        value: &str,
340        execution_id: u64,
341        last_updated_at_ms: u64,
342    ) -> Self {
343        Self {
344            id: Self::doc_id(instance_id, key),
345            instance_id: instance_id.to_string(),
346            doc_type: DOC_TYPE_KV.to_string(),
347            key: key.to_string(),
348            value: value.to_string(),
349            execution_id,
350            last_updated_at_ms,
351            etag: None,
352            rid: None,
353            self_link: None,
354            ts: None,
355            attachments: None,
356        }
357    }
358}
359
360/// Current-execution KV mutation entry.
361/// id = "<instanceId>:kv_delta:<key>"
362/// Partition key = instanceId
363#[derive(Debug, Clone, Serialize, Deserialize)]
364#[serde(rename_all = "camelCase")]
365pub struct KeyValueDeltaDocument {
366    pub id: String,
367    pub instance_id: String,
368    #[serde(rename = "type")]
369    pub doc_type: String,
370
371    pub key: String,
372    #[serde(default)]
373    pub value: Option<String>,
374    pub execution_id: u64,
375    #[serde(default)]
376    pub last_updated_at_ms: u64,
377
378    #[serde(rename = "_etag", default, skip_serializing)]
379    pub etag: Option<String>,
380    #[serde(rename = "_rid", default, skip_serializing)]
381    pub rid: Option<String>,
382    #[serde(rename = "_self", default, skip_serializing)]
383    pub self_link: Option<String>,
384    #[serde(rename = "_ts", default, skip_serializing)]
385    pub ts: Option<u64>,
386    #[serde(rename = "_attachments", default, skip_serializing)]
387    pub attachments: Option<String>,
388}
389
390impl KeyValueDeltaDocument {
391    pub fn doc_id(instance_id: &str, key: &str) -> String {
392        format!("{instance_id}:kv_delta:{key}")
393    }
394
395    pub fn new(
396        instance_id: &str,
397        key: &str,
398        value: Option<String>,
399        execution_id: u64,
400        last_updated_at_ms: u64,
401    ) -> Self {
402        Self {
403            id: Self::doc_id(instance_id, key),
404            instance_id: instance_id.to_string(),
405            doc_type: DOC_TYPE_KV_DELTA.to_string(),
406            key: key.to_string(),
407            value,
408            execution_id,
409            last_updated_at_ms,
410            etag: None,
411            rid: None,
412            self_link: None,
413            ts: None,
414            attachments: None,
415        }
416    }
417}
418
419// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
420// Session document
421// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
422
423/// Session affinity tracking document.
424/// id = "<instanceId>:session:<sessionId>"
425/// Partition key = instanceId
426#[derive(Debug, Clone, Serialize, Deserialize)]
427#[serde(rename_all = "camelCase")]
428pub struct SessionDocument {
429    pub id: String,
430    pub instance_id: String,
431    #[serde(rename = "type")]
432    pub doc_type: String,
433
434    pub session_id: String,
435    pub owner_id: String,
436    pub locked_until: u64,
437    pub last_activity: u64,
438    pub created_at: u64,
439
440    #[serde(rename = "_etag", default, skip_serializing)]
441    pub etag: Option<String>,
442    #[serde(rename = "_rid", default, skip_serializing)]
443    pub rid: Option<String>,
444    #[serde(rename = "_self", default, skip_serializing)]
445    pub self_link: Option<String>,
446    #[serde(rename = "_ts", default, skip_serializing)]
447    pub ts: Option<u64>,
448    #[serde(rename = "_attachments", default, skip_serializing)]
449    pub attachments: Option<String>,
450}
451
452impl SessionDocument {
453    pub fn doc_id(instance_id: &str, session_id: &str) -> String {
454        format!("{instance_id}:session:{session_id}")
455    }
456}
457
458// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
459// Utility functions
460// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
461
462/// Compute the dispatch slot for an instance ID.
463/// Returns a value 0-255 for partitioning across dispatchers.
464pub fn dispatch_slot(instance_id: &str) -> u8 {
465    use std::hash::{Hash, Hasher};
466    let mut hasher = std::collections::hash_map::DefaultHasher::new();
467    instance_id.hash(&mut hasher);
468    (hasher.finish() % 256) as u8
469}
470
471/// Pack a semver::Version into an i64 for range comparison in queries.
472///   packed = major * 1_000_000 + minor * 1_000 + patch
473pub fn pack_semver(v: &semver::Version) -> i64 {
474    (v.major as i64) * 1_000_000 + (v.minor as i64) * 1_000 + (v.patch as i64)
475}
476
477/// Unpack an i64 back into semver::Version.
478pub fn unpack_semver(packed: i64) -> semver::Version {
479    let major = (packed / 1_000_000) as u64;
480    let minor = ((packed % 1_000_000) / 1_000) as u64;
481    let patch = (packed % 1_000) as u64;
482    semver::Version::new(major, minor, patch)
483}
484
485/// Get current time in milliseconds since epoch.
486pub fn now_ms() -> u64 {
487    std::time::SystemTime::now()
488        .duration_since(std::time::UNIX_EPOCH)
489        .unwrap()
490        .as_millis() as u64
491}
492
493/// Get a u64 value representing the current tokio task ID.
494/// Used for lease slot allocation.
495pub fn task_id_u64() -> u64 {
496    use std::hash::{Hash, Hasher};
497    let mut hasher = std::collections::hash_map::DefaultHasher::new();
498    if let Some(id) = tokio::task::try_id() {
499        id.hash(&mut hasher);
500    } else {
501        // Fallback: use thread ID when not inside a tokio task
502        std::thread::current().id().hash(&mut hasher);
503    }
504    hasher.finish()
505}
506
507/// Extract the instance ID from a WorkItem.
508pub fn work_item_instance(item: &duroxide::providers::WorkItem) -> &str {
509    use duroxide::providers::WorkItem;
510    match item {
511        WorkItem::StartOrchestration { instance, .. } => instance,
512        WorkItem::ActivityExecute { instance, .. } => instance,
513        WorkItem::ActivityCompleted { instance, .. } => instance,
514        WorkItem::ActivityFailed { instance, .. } => instance,
515        WorkItem::TimerFired { instance, .. } => instance,
516        WorkItem::ExternalRaised { instance, .. } => instance,
517        WorkItem::SubOrchCompleted {
518            parent_instance, ..
519        } => parent_instance,
520        WorkItem::SubOrchFailed {
521            parent_instance, ..
522        } => parent_instance,
523        WorkItem::CancelInstance { instance, .. } => instance,
524        WorkItem::ContinueAsNew { instance, .. } => instance,
525        WorkItem::QueueMessage { instance, .. } => instance,
526    }
527}
528
529/// Build an idempotency key from the source context.
530pub fn idempotency_key(source_instance: &str, execution_id: u64, sequence: u64) -> String {
531    format!("{source_instance}:{execution_id}:{sequence}")
532}