use serde::{Deserialize, Serialize};
pub const DOC_TYPE_INSTANCE: &str = "instance";
pub const DOC_TYPE_HISTORY: &str = "history";
pub const DOC_TYPE_ORCH_QUEUE: &str = "orch_queue";
pub const DOC_TYPE_WORKER_QUEUE: &str = "worker_queue";
pub const DOC_TYPE_OUTBOX_INTENT: &str = "outbox_intent";
pub const DOC_TYPE_SESSION: &str = "session";
pub const DOC_TYPE_KV: &str = "kv";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InstanceDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub orchestration_name: String,
pub orchestration_version: String,
pub current_execution_id: u64,
pub status: String,
pub output: Option<String>,
pub parent_instance_id: Option<String>,
pub pinned_duroxide_version_packed: Option<i64>,
pub custom_status: Option<String>,
pub custom_status_version: u64,
pub lock_token: Option<String>,
pub locked_until: Option<u64>,
pub created_at: u64,
pub updated_at: u64,
#[serde(rename = "_etag", default, skip_serializing)]
pub etag: Option<String>,
#[serde(rename = "_rid", default, skip_serializing)]
pub rid: Option<String>,
#[serde(rename = "_self", default, skip_serializing)]
pub self_link: Option<String>,
#[serde(rename = "_ts", default, skip_serializing)]
pub ts: Option<u64>,
#[serde(rename = "_attachments", default, skip_serializing)]
pub attachments: Option<String>,
}
impl InstanceDocument {
pub fn doc_id(instance_id: &str) -> String {
format!("{instance_id}:instance")
}
pub fn new(
instance_id: &str,
orchestration_name: &str,
orchestration_version: &str,
execution_id: u64,
parent_instance_id: Option<&str>,
now_ms: u64,
) -> Self {
Self {
id: Self::doc_id(instance_id),
instance_id: instance_id.to_string(),
doc_type: DOC_TYPE_INSTANCE.to_string(),
orchestration_name: orchestration_name.to_string(),
orchestration_version: orchestration_version.to_string(),
current_execution_id: execution_id,
status: "Running".to_string(),
output: None,
parent_instance_id: parent_instance_id.map(|s| s.to_string()),
pinned_duroxide_version_packed: None,
custom_status: None,
custom_status_version: 0,
lock_token: None,
locked_until: None,
created_at: now_ms,
updated_at: now_ms,
etag: None,
rid: None,
self_link: None,
ts: None,
attachments: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HistoryDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub execution_id: u64,
pub event_id: u64,
pub event_data: String,
}
impl HistoryDocument {
pub fn doc_id(instance_id: &str, execution_id: u64, event_id: u64) -> String {
format!("{instance_id}:history:{execution_id}:{event_id}")
}
pub fn new(instance_id: &str, execution_id: u64, event_id: u64, event_data: String) -> Self {
Self {
id: Self::doc_id(instance_id, execution_id, event_id),
instance_id: instance_id.to_string(),
doc_type: DOC_TYPE_HISTORY.to_string(),
execution_id,
event_id,
event_data,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueItemDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub work_item: String,
pub dispatch_slot: u8,
pub visible_at: u64,
pub enqueued_at: u64,
pub lock_token: Option<String>,
pub locked_until: Option<u64>,
pub attempt_count: i32,
pub execution_id: Option<u64>,
pub activity_id: Option<u64>,
pub session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub tag: Option<String>,
#[serde(rename = "_etag", default, skip_serializing)]
pub etag: Option<String>,
#[serde(rename = "_rid", default, skip_serializing)]
pub rid: Option<String>,
#[serde(rename = "_self", default, skip_serializing)]
pub self_link: Option<String>,
#[serde(rename = "_ts", default, skip_serializing)]
pub ts: Option<u64>,
#[serde(rename = "_attachments", default, skip_serializing)]
pub attachments: Option<String>,
}
impl QueueItemDocument {
pub fn new_orch_queue(
instance_id: &str,
work_item_json: String,
visible_at: u64,
now_ms: u64,
) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
instance_id: instance_id.to_string(),
doc_type: DOC_TYPE_ORCH_QUEUE.to_string(),
work_item: work_item_json,
dispatch_slot: dispatch_slot(instance_id),
visible_at,
enqueued_at: now_ms,
lock_token: None,
locked_until: None,
attempt_count: 0,
execution_id: None,
activity_id: None,
session_id: None,
tag: None,
etag: None,
rid: None,
self_link: None,
ts: None,
attachments: None,
}
}
pub fn new_worker_queue(
instance_id: &str,
work_item_json: String,
execution_id: Option<u64>,
activity_id: Option<u64>,
session_id: Option<String>,
tag: Option<String>,
now_ms: u64,
) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
instance_id: instance_id.to_string(),
doc_type: DOC_TYPE_WORKER_QUEUE.to_string(),
work_item: work_item_json,
dispatch_slot: dispatch_slot(instance_id),
visible_at: now_ms,
enqueued_at: now_ms,
lock_token: None,
locked_until: None,
attempt_count: 0,
execution_id,
activity_id,
session_id,
tag,
etag: None,
rid: None,
self_link: None,
ts: None,
attachments: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OutboxIntentDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub target_instance_id: String,
pub target_document_type: String,
pub payload: String,
pub idempotency_key: String,
pub status: String,
pub created_at: u64,
pub attempt_count: i32,
pub last_attempt_at: Option<u64>,
}
impl OutboxIntentDocument {
pub fn new(
source_instance_id: &str,
target_instance_id: &str,
target_doc_type: &str,
payload: String,
idempotency_key: String,
now_ms: u64,
) -> Self {
Self {
id: format!("intent:{idempotency_key}"),
instance_id: source_instance_id.to_string(),
doc_type: DOC_TYPE_OUTBOX_INTENT.to_string(),
target_instance_id: target_instance_id.to_string(),
target_document_type: target_doc_type.to_string(),
payload,
idempotency_key,
status: "pending".to_string(),
created_at: now_ms,
attempt_count: 0,
last_attempt_at: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KeyValueDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub key: String,
pub value: String,
pub execution_id: u64,
#[serde(default)]
pub last_updated_at_ms: u64,
#[serde(rename = "_etag", default, skip_serializing)]
pub etag: Option<String>,
#[serde(rename = "_rid", default, skip_serializing)]
pub rid: Option<String>,
#[serde(rename = "_self", default, skip_serializing)]
pub self_link: Option<String>,
#[serde(rename = "_ts", default, skip_serializing)]
pub ts: Option<u64>,
#[serde(rename = "_attachments", default, skip_serializing)]
pub attachments: Option<String>,
}
impl KeyValueDocument {
pub fn doc_id(instance_id: &str, key: &str) -> String {
format!("{instance_id}:kv:{key}")
}
pub fn new(
instance_id: &str,
key: &str,
value: &str,
execution_id: u64,
last_updated_at_ms: u64,
) -> Self {
Self {
id: Self::doc_id(instance_id, key),
instance_id: instance_id.to_string(),
doc_type: DOC_TYPE_KV.to_string(),
key: key.to_string(),
value: value.to_string(),
execution_id,
last_updated_at_ms,
etag: None,
rid: None,
self_link: None,
ts: None,
attachments: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionDocument {
pub id: String,
pub instance_id: String,
#[serde(rename = "type")]
pub doc_type: String,
pub session_id: String,
pub owner_id: String,
pub locked_until: u64,
pub last_activity: u64,
pub created_at: u64,
#[serde(rename = "_etag", default, skip_serializing)]
pub etag: Option<String>,
#[serde(rename = "_rid", default, skip_serializing)]
pub rid: Option<String>,
#[serde(rename = "_self", default, skip_serializing)]
pub self_link: Option<String>,
#[serde(rename = "_ts", default, skip_serializing)]
pub ts: Option<u64>,
#[serde(rename = "_attachments", default, skip_serializing)]
pub attachments: Option<String>,
}
impl SessionDocument {
pub fn doc_id(instance_id: &str, session_id: &str) -> String {
format!("{instance_id}:session:{session_id}")
}
}
pub fn dispatch_slot(instance_id: &str) -> u8 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
instance_id.hash(&mut hasher);
(hasher.finish() % 256) as u8
}
pub fn pack_semver(v: &semver::Version) -> i64 {
(v.major as i64) * 1_000_000 + (v.minor as i64) * 1_000 + (v.patch as i64)
}
pub fn unpack_semver(packed: i64) -> semver::Version {
let major = (packed / 1_000_000) as u64;
let minor = ((packed % 1_000_000) / 1_000) as u64;
let patch = (packed % 1_000) as u64;
semver::Version::new(major, minor, patch)
}
pub fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
pub fn task_id_u64() -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
if let Some(id) = tokio::task::try_id() {
id.hash(&mut hasher);
} else {
std::thread::current().id().hash(&mut hasher);
}
hasher.finish()
}
pub fn work_item_instance(item: &duroxide::providers::WorkItem) -> &str {
use duroxide::providers::WorkItem;
match item {
WorkItem::StartOrchestration { instance, .. } => instance,
WorkItem::ActivityExecute { instance, .. } => instance,
WorkItem::ActivityCompleted { instance, .. } => instance,
WorkItem::ActivityFailed { instance, .. } => instance,
WorkItem::TimerFired { instance, .. } => instance,
WorkItem::ExternalRaised { instance, .. } => instance,
WorkItem::SubOrchCompleted {
parent_instance, ..
} => parent_instance,
WorkItem::SubOrchFailed {
parent_instance, ..
} => parent_instance,
WorkItem::CancelInstance { instance, .. } => instance,
WorkItem::ContinueAsNew { instance, .. } => instance,
WorkItem::QueueMessage { instance, .. } => instance,
}
}
pub fn idempotency_key(source_instance: &str, execution_id: u64, sequence: u64) -> String {
format!("{source_instance}:{execution_id}:{sequence}")
}