use std::sync::Arc;
use tracing::info;
use crate::_typed_codec::{Codec, Json};
use crate::providers::{
DeleteInstanceResult, ExecutionInfo, InstanceFilter, InstanceInfo, InstanceTree, Provider, ProviderAdmin,
ProviderError, PruneOptions, PruneResult, QueueDepths, SystemMetrics, WorkItem,
};
use crate::{EventKind, OrchestrationStatus};
use serde::Serialize;
#[derive(Debug, Clone)]
pub enum ClientError {
Provider(ProviderError),
ManagementNotAvailable,
InvalidInput { message: String },
Timeout,
InstanceStillRunning { instance_id: String },
CannotDeleteSubOrchestration { instance_id: String },
InstanceNotFound { instance_id: String },
}
impl ClientError {
pub fn is_retryable(&self) -> bool {
match self {
ClientError::Provider(e) => e.is_retryable(),
ClientError::ManagementNotAvailable => false,
ClientError::InvalidInput { .. } => false,
ClientError::Timeout => true,
ClientError::InstanceStillRunning { .. } => false,
ClientError::CannotDeleteSubOrchestration { .. } => false,
ClientError::InstanceNotFound { .. } => false,
}
}
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClientError::Provider(e) => write!(f, "{e}"),
ClientError::ManagementNotAvailable => write!(
f,
"Management features not available - provider doesn't implement ProviderAdmin"
),
ClientError::InvalidInput { message } => write!(f, "Invalid input: {message}"),
ClientError::Timeout => write!(f, "Operation timed out"),
ClientError::InstanceStillRunning { instance_id } => write!(
f,
"Instance {instance_id} is still running. Use force=true or cancel first."
),
ClientError::CannotDeleteSubOrchestration { instance_id } => write!(
f,
"Cannot delete sub-orchestration {instance_id} directly. Delete the root orchestration instead."
),
ClientError::InstanceNotFound { instance_id } => {
write!(f, "Instance {instance_id} not found")
}
}
}
}
impl std::error::Error for ClientError {}
impl From<ProviderError> for ClientError {
fn from(e: ProviderError) -> Self {
ClientError::Provider(e)
}
}
const INITIAL_POLL_DELAY_MS: u64 = 5;
const MAX_POLL_DELAY_MS: u64 = 100;
const POLL_DELAY_MULTIPLIER: u64 = 2;
pub struct Client {
store: Arc<dyn Provider>,
}
impl Client {
pub fn new(store: Arc<dyn Provider>) -> Self {
Self { store }
}
pub async fn start_orchestration(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
input: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::StartOrchestration {
instance: instance.into(),
orchestration: orchestration.into(),
input: input.into(),
version: None,
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn start_orchestration_versioned(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
version: impl Into<String>,
input: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::StartOrchestration {
instance: instance.into(),
orchestration: orchestration.into(),
input: input.into(),
version: Some(version.into()),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn start_orchestration_typed<In: Serialize>(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
input: In,
) -> Result<(), ClientError> {
let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
message: format!("encode: {e}"),
})?;
self.start_orchestration(instance, orchestration, payload).await
}
pub async fn start_orchestration_versioned_typed<In: Serialize>(
&self,
instance: impl Into<String>,
orchestration: impl Into<String>,
version: impl Into<String>,
input: In,
) -> Result<(), ClientError> {
let payload = Json::encode(&input).map_err(|e| ClientError::InvalidInput {
message: format!("encode: {e}"),
})?;
self.start_orchestration_versioned(instance, orchestration, version, payload)
.await
}
pub async fn raise_event(
&self,
instance: impl Into<String>,
event_name: impl Into<String>,
data: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::ExternalRaised {
instance: instance.into(),
name: event_name.into(),
data: data.into(),
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn raise_event_typed<T: serde::Serialize>(
&self,
instance: impl Into<String>,
event_name: impl Into<String>,
data: &T,
) -> Result<(), ClientError> {
let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
self.raise_event(instance, event_name, payload).await
}
pub async fn enqueue_event(
&self,
instance: impl Into<String>,
queue: impl Into<String>,
data: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::QueueMessage {
instance: instance.into(),
name: queue.into(),
data: data.into(),
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn enqueue_event_typed<T: serde::Serialize>(
&self,
instance: impl Into<String>,
queue: impl Into<String>,
data: &T,
) -> Result<(), ClientError> {
let payload = crate::_typed_codec::Json::encode(data).expect("Serialization should not fail");
self.enqueue_event(instance, queue, payload).await
}
#[deprecated(note = "Use enqueue_event() instead")]
pub async fn raise_event_persistent(
&self,
instance: impl Into<String>,
event_name: impl Into<String>,
data: impl Into<String>,
) -> Result<(), ClientError> {
self.enqueue_event(instance, event_name, data).await
}
#[cfg(feature = "replay-version-test")]
pub async fn raise_event2(
&self,
instance: impl Into<String>,
event_name: impl Into<String>,
topic: impl Into<String>,
data: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::ExternalRaised2 {
instance: instance.into(),
name: event_name.into(),
topic: topic.into(),
data: data.into(),
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn cancel_instance(
&self,
instance: impl Into<String>,
reason: impl Into<String>,
) -> Result<(), ClientError> {
let item = WorkItem::CancelInstance {
instance: instance.into(),
reason: reason.into(),
};
self.store
.enqueue_for_orchestrator(item, None)
.await
.map_err(ClientError::from)
}
pub async fn get_orchestration_status(&self, instance: &str) -> Result<OrchestrationStatus, ClientError> {
let hist = self.store.read(instance).await.map_err(ClientError::from)?;
let (custom_status, custom_status_version) = match self.store.get_custom_status(instance, 0).await {
Ok(Some((cs, v))) => (cs, v),
Ok(None) => (None, 0),
Err(_) => (None, 0), };
for e in hist.iter().rev() {
match &e.kind {
EventKind::OrchestrationCompleted { output } => {
return Ok(OrchestrationStatus::Completed {
output: output.clone(),
custom_status,
custom_status_version,
});
}
EventKind::OrchestrationFailed { details } => {
return Ok(OrchestrationStatus::Failed {
details: details.clone(),
custom_status,
custom_status_version,
});
}
_ => {}
}
}
if hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
{
Ok(OrchestrationStatus::Running {
custom_status,
custom_status_version,
})
} else {
Ok(OrchestrationStatus::NotFound)
}
}
pub async fn wait_for_orchestration(
&self,
instance: &str,
timeout: std::time::Duration,
) -> Result<OrchestrationStatus, ClientError> {
let deadline = std::time::Instant::now() + timeout;
match self.get_orchestration_status(instance).await {
Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
Err(e) => return Err(e),
_ => {}
}
let mut delay_ms: u64 = INITIAL_POLL_DELAY_MS;
while std::time::Instant::now() < deadline {
match self.get_orchestration_status(instance).await {
Ok(s @ OrchestrationStatus::Completed { .. }) => return Ok(s),
Ok(s @ OrchestrationStatus::Failed { .. }) => return Ok(s),
Err(e) => return Err(e),
_ => {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms.saturating_mul(POLL_DELAY_MULTIPLIER)).min(MAX_POLL_DELAY_MS);
}
}
}
Err(ClientError::Timeout)
}
pub async fn wait_for_orchestration_typed<Out: serde::de::DeserializeOwned>(
&self,
instance: &str,
timeout: std::time::Duration,
) -> Result<Result<Out, String>, ClientError> {
match self.wait_for_orchestration(instance, timeout).await? {
OrchestrationStatus::Completed { output, .. } => match Json::decode::<Out>(&output) {
Ok(v) => Ok(Ok(v)),
Err(e) => Err(ClientError::InvalidInput {
message: format!("decode failed: {e}"),
}),
},
OrchestrationStatus::Failed { details, .. } => Ok(Err(details.display_message())),
_ => unreachable!("wait_for_orchestration returns only terminal or timeout"),
}
}
pub async fn wait_for_status_change(
&self,
instance: &str,
last_seen_version: u64,
poll_interval: std::time::Duration,
timeout: std::time::Duration,
) -> Result<OrchestrationStatus, ClientError> {
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
match self.store.get_custom_status(instance, last_seen_version).await {
Ok(Some(_)) => {
return self.get_orchestration_status(instance).await;
}
Ok(None) => {
}
Err(e) => return Err(ClientError::from(e)),
}
match self.get_orchestration_status(instance).await? {
OrchestrationStatus::Running { .. } | OrchestrationStatus::NotFound => {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
tokio::time::sleep(poll_interval.min(remaining)).await;
}
terminal => return Ok(terminal),
}
}
Err(ClientError::Timeout)
}
pub async fn get_kv_value(&self, instance: &str, key: &str) -> Result<Option<String>, ClientError> {
self.store.get_kv_value(instance, key).await.map_err(ClientError::from)
}
pub async fn get_kv_value_typed<T: serde::de::DeserializeOwned>(
&self,
instance: &str,
key: &str,
) -> Result<Option<T>, ClientError> {
match self.get_kv_value(instance, key).await? {
None => Ok(None),
Some(s) => serde_json::from_str(&s)
.map(Some)
.map_err(|e| ClientError::InvalidInput {
message: format!("KV deserialization error: {e}"),
}),
}
}
pub async fn get_kv_all_values(
&self,
instance: &str,
) -> Result<std::collections::HashMap<String, String>, ClientError> {
self.store.get_kv_all_values(instance).await.map_err(ClientError::from)
}
pub async fn wait_for_kv_value(
&self,
instance: &str,
key: &str,
timeout: std::time::Duration,
) -> Result<String, ClientError> {
let deadline = std::time::Instant::now() + timeout;
if let Some(val) = self.get_kv_value(instance, key).await? {
return Ok(val);
}
let mut delay_ms: u64 = INITIAL_POLL_DELAY_MS;
while std::time::Instant::now() < deadline {
if let Some(val) = self.get_kv_value(instance, key).await? {
return Ok(val);
}
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms.saturating_mul(POLL_DELAY_MULTIPLIER)).min(MAX_POLL_DELAY_MS);
}
Err(ClientError::Timeout)
}
pub async fn wait_for_kv_value_typed<T: serde::de::DeserializeOwned>(
&self,
instance: &str,
key: &str,
timeout: std::time::Duration,
) -> Result<T, ClientError> {
let raw = self.wait_for_kv_value(instance, key, timeout).await?;
serde_json::from_str(&raw).map_err(|e| ClientError::InvalidInput {
message: format!("KV deserialization error: {e}"),
})
}
pub fn has_management_capability(&self) -> bool {
self.discover_management().is_ok()
}
fn discover_management(&self) -> Result<&dyn ProviderAdmin, ClientError> {
self.store
.as_management_capability()
.ok_or(ClientError::ManagementNotAvailable)
}
pub async fn get_orchestration_stats(&self, instance: &str) -> Result<Option<crate::SystemStats>, ClientError> {
self.store.get_instance_stats(instance).await.map_err(ClientError::from)
}
pub async fn list_all_instances(&self) -> Result<Vec<String>, ClientError> {
self.discover_management()?
.list_instances()
.await
.map_err(ClientError::from)
}
pub async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ClientError> {
self.discover_management()?
.list_instances_by_status(status)
.await
.map_err(ClientError::from)
}
pub async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ClientError> {
self.discover_management()?
.get_instance_info(instance)
.await
.map_err(ClientError::from)
}
pub async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ClientError> {
self.discover_management()?
.get_execution_info(instance, execution_id)
.await
.map_err(ClientError::from)
}
pub async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ClientError> {
let mgmt = self.discover_management()?;
mgmt.list_executions(instance).await.map_err(ClientError::from)
}
pub async fn read_execution_history(
&self,
instance: &str,
execution_id: u64,
) -> Result<Vec<crate::Event>, ClientError> {
let mgmt = self.discover_management()?;
mgmt.read_history_with_execution_id(instance, execution_id)
.await
.map_err(ClientError::from)
}
pub async fn get_system_metrics(&self) -> Result<SystemMetrics, ClientError> {
self.discover_management()?
.get_system_metrics()
.await
.map_err(ClientError::from)
}
pub async fn get_queue_depths(&self) -> Result<QueueDepths, ClientError> {
self.discover_management()?
.get_queue_depths()
.await
.map_err(ClientError::from)
}
pub async fn get_instance_tree(&self, instance_id: &str) -> Result<InstanceTree, ClientError> {
let mgmt = self.discover_management()?;
mgmt.get_instance_tree(instance_id).await.map_err(ClientError::from)
}
pub async fn delete_instance(&self, instance_id: &str, force: bool) -> Result<DeleteInstanceResult, ClientError> {
let mgmt = self.discover_management()?;
let result = mgmt
.delete_instance(instance_id, force)
.await
.map_err(|e| Self::translate_delete_error(e, instance_id))?;
info!(
instance_id = %instance_id,
force = %force,
instances_deleted = %result.instances_deleted,
executions_deleted = %result.executions_deleted,
events_deleted = %result.events_deleted,
queue_messages_deleted = %result.queue_messages_deleted,
"Deleted instance"
);
Ok(result)
}
pub async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ClientError> {
let result = self
.discover_management()?
.delete_instance_bulk(filter.clone())
.await
.map_err(ClientError::from)?;
info!(
filter = ?filter,
instances_deleted = %result.instances_deleted,
executions_deleted = %result.executions_deleted,
events_deleted = %result.events_deleted,
queue_messages_deleted = %result.queue_messages_deleted,
"Bulk deleted instances"
);
Ok(result)
}
pub async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ClientError> {
let result = self
.discover_management()?
.prune_executions(instance_id, options.clone())
.await
.map_err(ClientError::from)?;
info!(
instance_id = %instance_id,
options = ?options,
executions_deleted = %result.executions_deleted,
events_deleted = %result.events_deleted,
instances_processed = %result.instances_processed,
"Pruned executions"
);
Ok(result)
}
pub async fn prune_executions_bulk(
&self,
filter: InstanceFilter,
options: PruneOptions,
) -> Result<PruneResult, ClientError> {
let result = self
.discover_management()?
.prune_executions_bulk(filter.clone(), options.clone())
.await
.map_err(ClientError::from)?;
info!(
filter = ?filter,
options = ?options,
executions_deleted = %result.executions_deleted,
events_deleted = %result.events_deleted,
instances_processed = %result.instances_processed,
"Bulk pruned executions"
);
Ok(result)
}
fn translate_delete_error(e: ProviderError, instance_id: &str) -> ClientError {
let msg = e.to_string().to_lowercase();
if msg.contains("not found") {
ClientError::InstanceNotFound {
instance_id: instance_id.to_string(),
}
} else if msg.contains("still running") {
ClientError::InstanceStillRunning {
instance_id: instance_id.to_string(),
}
} else if msg.contains("sub-orchestration") || msg.contains("delete the root") {
ClientError::CannotDeleteSubOrchestration {
instance_id: instance_id.to_string(),
}
} else {
ClientError::Provider(e)
}
}
}