Skip to main content

neuron_runtime/
durable.rs

1//! Durable execution contexts for local development and production engines.
2//!
3//! [`LocalDurableContext`] is a passthrough implementation for local dev/testing
4//! that calls the provider and tools directly without journaling.
5//!
6//! For production durable execution (Temporal, Restate, Inngest), see the
7//! documentation at the bottom of this module for how to implement
8//! [`DurableContext`] with those SDKs.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use neuron_tool::ToolRegistry;
14use neuron_types::{
15    ActivityOptions, CompletionRequest, CompletionResponse, DurableContext, DurableError, Provider,
16    ToolContext, ToolOutput, WasmCompatSend,
17};
18
19/// A passthrough durable context for local development and testing.
20///
21/// Executes LLM calls and tool calls directly without journaling.
22/// No crash recovery, no replay — just direct passthrough.
23///
24/// This is the default when you do not need durable execution but want
25/// a concrete [`DurableContext`] implementation.
26pub struct LocalDurableContext<P: Provider> {
27    provider: Arc<P>,
28    tools: Arc<ToolRegistry>,
29}
30
31impl<P: Provider> LocalDurableContext<P> {
32    /// Create a new local durable context.
33    #[must_use]
34    pub fn new(provider: Arc<P>, tools: Arc<ToolRegistry>) -> Self {
35        Self { provider, tools }
36    }
37}
38
39impl<P: Provider> DurableContext for LocalDurableContext<P> {
40    async fn execute_llm_call(
41        &self,
42        request: CompletionRequest,
43        _options: ActivityOptions,
44    ) -> Result<CompletionResponse, DurableError> {
45        self.provider
46            .complete(request)
47            .await
48            .map_err(|e| DurableError::ActivityFailed(e.to_string()))
49    }
50
51    async fn execute_tool(
52        &self,
53        tool_name: &str,
54        input: serde_json::Value,
55        ctx: &ToolContext,
56        _options: ActivityOptions,
57    ) -> Result<ToolOutput, DurableError> {
58        self.tools
59            .execute(tool_name, input, ctx)
60            .await
61            .map_err(|e| DurableError::ActivityFailed(e.to_string()))
62    }
63
64    async fn wait_for_signal<T: serde::de::DeserializeOwned + WasmCompatSend>(
65        &self,
66        _signal_name: &str,
67        timeout: Duration,
68    ) -> Result<Option<T>, DurableError> {
69        // Local context: no signals, just wait for the timeout
70        tokio::time::sleep(timeout).await;
71        Ok(None)
72    }
73
74    fn should_continue_as_new(&self) -> bool {
75        false
76    }
77
78    async fn continue_as_new(&self, _state: serde_json::Value) -> Result<(), DurableError> {
79        // Local context: continue-as-new is a no-op
80        Ok(())
81    }
82
83    async fn sleep(&self, duration: Duration) {
84        tokio::time::sleep(duration).await;
85    }
86
87    fn now(&self) -> chrono::DateTime<chrono::Utc> {
88        chrono::Utc::now()
89    }
90}
91
92// =============================================================================
93// Production durable execution implementations
94// =============================================================================
95//
96// ## Temporal
97//
98// To implement `DurableContext` for Temporal, add `temporal-sdk` as a dependency
99// (feature-gated) and wrap the Temporal workflow context:
100//
101// ```ignore
102// use temporal_sdk::WfContext;
103//
104// pub struct TemporalDurableContext {
105//     ctx: WfContext,
106//     tools: Arc<ToolRegistry>,
107// }
108//
109// impl DurableContext for TemporalDurableContext {
110//     async fn execute_llm_call(
111//         &self,
112//         request: CompletionRequest,
113//         options: ActivityOptions,
114//     ) -> Result<CompletionResponse, DurableError> {
115//         let input = serde_json::to_string(&request)
116//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
117//         let result = self.ctx
118//             .activity(ActivityOptions {
119//                 activity_type: "llm_call".to_string(),
120//                 start_to_close_timeout: Some(options.start_to_close_timeout),
121//                 heartbeat_timeout: options.heartbeat_timeout,
122//                 retry_policy: options.retry_policy.map(convert_retry_policy),
123//                 input: vec![input.into()],
124//                 ..Default::default()
125//             })
126//             .await
127//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
128//         serde_json::from_slice(&result.result)
129//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))
130//     }
131//
132//     // ... similar for execute_tool, wait_for_signal, etc.
133//
134//     fn should_continue_as_new(&self) -> bool {
135//         self.ctx.get_info().is_continue_as_new_suggested()
136//     }
137//
138//     async fn sleep(&self, duration: Duration) {
139//         self.ctx.timer(duration).await;
140//     }
141//
142//     fn now(&self) -> chrono::DateTime<chrono::Utc> {
143//         // Temporal provides deterministic time during replay
144//         self.ctx.workflow_time()
145//     }
146// }
147// ```
148//
149// ## Restate
150//
151// To implement `DurableContext` for Restate, add `restate-sdk` as a dependency
152// (feature-gated) and wrap the Restate context:
153//
154// ```ignore
155// use restate_sdk::context::Context;
156//
157// pub struct RestateDurableContext {
158//     ctx: Context,
159//     tools: Arc<ToolRegistry>,
160// }
161//
162// impl DurableContext for RestateDurableContext {
163//     async fn execute_llm_call(
164//         &self,
165//         request: CompletionRequest,
166//         _options: ActivityOptions,
167//     ) -> Result<CompletionResponse, DurableError> {
168//         self.ctx
169//             .run("llm_call", || async {
170//                 // Direct LLM call here — Restate journals the result
171//                 provider.complete(request).await
172//             })
173//             .await
174//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))
175//     }
176//
177//     // ... similar for execute_tool, etc.
178//
179//     async fn sleep(&self, duration: Duration) {
180//         self.ctx.sleep(duration).await;
181//     }
182// }
183// ```