neuron_runtime/durable.rs
1//! Durable execution contexts for local development and production engines.
2//!
3//! [`LocalDurableContext`] is a passthrough implementation for local dev/testing
4//! that calls the provider and tools directly without journaling.
5//!
6//! For production durable execution (Temporal, Restate, Inngest), see the
7//! documentation at the bottom of this module for how to implement
8//! [`DurableContext`] with those SDKs.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use neuron_tool::ToolRegistry;
14use neuron_types::{
15 ActivityOptions, CompletionRequest, CompletionResponse, DurableContext, DurableError, Provider,
16 ToolContext, ToolOutput, WasmCompatSend,
17};
18
19/// A passthrough durable context for local development and testing.
20///
21/// Executes LLM calls and tool calls directly without journaling.
22/// No crash recovery, no replay — just direct passthrough.
23///
24/// This is the default when you do not need durable execution but want
25/// a concrete [`DurableContext`] implementation.
26pub struct LocalDurableContext<P: Provider> {
27 provider: Arc<P>,
28 tools: Arc<ToolRegistry>,
29}
30
31impl<P: Provider> LocalDurableContext<P> {
32 /// Create a new local durable context.
33 #[must_use]
34 pub fn new(provider: Arc<P>, tools: Arc<ToolRegistry>) -> Self {
35 Self { provider, tools }
36 }
37}
38
39impl<P: Provider> DurableContext for LocalDurableContext<P> {
40 async fn execute_llm_call(
41 &self,
42 request: CompletionRequest,
43 _options: ActivityOptions,
44 ) -> Result<CompletionResponse, DurableError> {
45 self.provider
46 .complete(request)
47 .await
48 .map_err(|e| DurableError::ActivityFailed(e.to_string()))
49 }
50
51 async fn execute_tool(
52 &self,
53 tool_name: &str,
54 input: serde_json::Value,
55 ctx: &ToolContext,
56 _options: ActivityOptions,
57 ) -> Result<ToolOutput, DurableError> {
58 self.tools
59 .execute(tool_name, input, ctx)
60 .await
61 .map_err(|e| DurableError::ActivityFailed(e.to_string()))
62 }
63
64 async fn wait_for_signal<T: serde::de::DeserializeOwned + WasmCompatSend>(
65 &self,
66 _signal_name: &str,
67 timeout: Duration,
68 ) -> Result<Option<T>, DurableError> {
69 // Local context: no signals, just wait for the timeout
70 tokio::time::sleep(timeout).await;
71 Ok(None)
72 }
73
74 fn should_continue_as_new(&self) -> bool {
75 false
76 }
77
78 async fn continue_as_new(
79 &self,
80 _state: serde_json::Value,
81 ) -> Result<(), DurableError> {
82 // Local context: continue-as-new is a no-op
83 Ok(())
84 }
85
86 async fn sleep(&self, duration: Duration) {
87 tokio::time::sleep(duration).await;
88 }
89
90 fn now(&self) -> chrono::DateTime<chrono::Utc> {
91 chrono::Utc::now()
92 }
93}
94
95// =============================================================================
96// Production durable execution implementations
97// =============================================================================
98//
99// ## Temporal
100//
101// To implement `DurableContext` for Temporal, add `temporal-sdk` as a dependency
102// (feature-gated) and wrap the Temporal workflow context:
103//
104// ```ignore
105// use temporal_sdk::WfContext;
106//
107// pub struct TemporalDurableContext {
108// ctx: WfContext,
109// tools: Arc<ToolRegistry>,
110// }
111//
112// impl DurableContext for TemporalDurableContext {
113// async fn execute_llm_call(
114// &self,
115// request: CompletionRequest,
116// options: ActivityOptions,
117// ) -> Result<CompletionResponse, DurableError> {
118// let input = serde_json::to_string(&request)
119// .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
120// let result = self.ctx
121// .activity(ActivityOptions {
122// activity_type: "llm_call".to_string(),
123// start_to_close_timeout: Some(options.start_to_close_timeout),
124// heartbeat_timeout: options.heartbeat_timeout,
125// retry_policy: options.retry_policy.map(convert_retry_policy),
126// input: vec![input.into()],
127// ..Default::default()
128// })
129// .await
130// .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
131// serde_json::from_slice(&result.result)
132// .map_err(|e| DurableError::ActivityFailed(e.to_string()))
133// }
134//
135// // ... similar for execute_tool, wait_for_signal, etc.
136//
137// fn should_continue_as_new(&self) -> bool {
138// self.ctx.get_info().is_continue_as_new_suggested()
139// }
140//
141// async fn sleep(&self, duration: Duration) {
142// self.ctx.timer(duration).await;
143// }
144//
145// fn now(&self) -> chrono::DateTime<chrono::Utc> {
146// // Temporal provides deterministic time during replay
147// self.ctx.workflow_time()
148// }
149// }
150// ```
151//
152// ## Restate
153//
154// To implement `DurableContext` for Restate, add `restate-sdk` as a dependency
155// (feature-gated) and wrap the Restate context:
156//
157// ```ignore
158// use restate_sdk::context::Context;
159//
160// pub struct RestateDurableContext {
161// ctx: Context,
162// tools: Arc<ToolRegistry>,
163// }
164//
165// impl DurableContext for RestateDurableContext {
166// async fn execute_llm_call(
167// &self,
168// request: CompletionRequest,
169// _options: ActivityOptions,
170// ) -> Result<CompletionResponse, DurableError> {
171// self.ctx
172// .run("llm_call", || async {
173// // Direct LLM call here — Restate journals the result
174// provider.complete(request).await
175// })
176// .await
177// .map_err(|e| DurableError::ActivityFailed(e.to_string()))
178// }
179//
180// // ... similar for execute_tool, etc.
181//
182// async fn sleep(&self, duration: Duration) {
183// self.ctx.sleep(duration).await;
184// }
185// }
186// ```