use dapr_durabletask::api::Result;
use dapr_durabletask::client::TaskHubGrpcClient;
use dapr_durabletask::task::{ActivityContext, OrchestrationContext};
use dapr_durabletask::worker::{ActivityResult, OrchestratorResult, TaskHubGrpcWorker};
async fn say_hello(_ctx: ActivityContext, input: Option<String>) -> ActivityResult {
let name: String = serde_json::from_str(input.as_deref().unwrap_or("\"World\""))?;
let greeting = format!("Hello, {}!", name);
Ok(Some(serde_json::to_string(&greeting)?))
}
async fn sequence(ctx: OrchestrationContext) -> OrchestratorResult {
let r1: String = serde_json::from_str(
ctx.call_activity("say_hello", "Tokyo")
.await?
.as_deref()
.unwrap_or("null"),
)?;
let r2: String = serde_json::from_str(
ctx.call_activity("say_hello", "Seattle")
.await?
.as_deref()
.unwrap_or("null"),
)?;
let r3: String = serde_json::from_str(
ctx.call_activity("say_hello", "London")
.await?
.as_deref()
.unwrap_or("null"),
)?;
let results = vec![r1, r2, r3];
Ok(Some(serde_json::to_string(&results)?))
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let address = "http://localhost:4001";
let mut worker = TaskHubGrpcWorker::new(address);
worker
.registry_mut()
.add_named_orchestrator("sequence", sequence);
worker
.registry_mut()
.add_named_activity("say_hello", say_hello);
let shutdown = tokio_util::sync::CancellationToken::new();
let worker_shutdown = shutdown.clone();
let worker_handle = tokio::spawn(async move { worker.start(worker_shutdown).await });
let mut client = TaskHubGrpcClient::new(address).await?;
let instance_id = client
.schedule_new_orchestration("sequence", None, None, None)
.await?;
println!("Started orchestration: {}", instance_id);
let state = client
.wait_for_orchestration_completion(&instance_id, true, None)
.await?;
if let Some(state) = state {
println!("Status: {}", state.runtime_status);
println!("Output: {:?}", state.serialized_output);
}
shutdown.cancel();
let _ = worker_handle.await;
Ok(())
}