Skip to main content

neuron_runtime/
durable.rs

1//! Durable execution contexts for local development and production engines.
2//!
3//! [`LocalDurableContext`] is a passthrough implementation for local dev/testing
4//! that calls the provider and tools directly without journaling.
5//!
6//! For production durable execution (Temporal, Restate, Inngest), see the
7//! documentation at the bottom of this module for how to implement
8//! [`DurableContext`] with those SDKs.
9
10use std::sync::Arc;
11use std::time::Duration;
12
13use neuron_tool::ToolRegistry;
14use neuron_types::{
15    ActivityOptions, CompletionRequest, CompletionResponse, DurableContext, DurableError, Provider,
16    ToolContext, ToolOutput, WasmCompatSend,
17};
18
19/// A passthrough durable context for local development and testing.
20///
21/// Executes LLM calls and tool calls directly without journaling.
22/// No crash recovery, no replay — just direct passthrough.
23///
24/// This is the default when you do not need durable execution but want
25/// a concrete [`DurableContext`] implementation.
26pub struct LocalDurableContext<P: Provider> {
27    provider: Arc<P>,
28    tools: Arc<ToolRegistry>,
29}
30
31impl<P: Provider> LocalDurableContext<P> {
32    /// Create a new local durable context.
33    #[must_use]
34    pub fn new(provider: Arc<P>, tools: Arc<ToolRegistry>) -> Self {
35        Self { provider, tools }
36    }
37}
38
39impl<P: Provider> DurableContext for LocalDurableContext<P> {
40    async fn execute_llm_call(
41        &self,
42        request: CompletionRequest,
43        _options: ActivityOptions,
44    ) -> Result<CompletionResponse, DurableError> {
45        self.provider
46            .complete(request)
47            .await
48            .map_err(|e| DurableError::ActivityFailed(e.to_string()))
49    }
50
51    async fn execute_tool(
52        &self,
53        tool_name: &str,
54        input: serde_json::Value,
55        ctx: &ToolContext,
56        _options: ActivityOptions,
57    ) -> Result<ToolOutput, DurableError> {
58        self.tools
59            .execute(tool_name, input, ctx)
60            .await
61            .map_err(|e| DurableError::ActivityFailed(e.to_string()))
62    }
63
64    async fn wait_for_signal<T: serde::de::DeserializeOwned + WasmCompatSend>(
65        &self,
66        _signal_name: &str,
67        timeout: Duration,
68    ) -> Result<Option<T>, DurableError> {
69        // Local context: no signals, just wait for the timeout
70        tokio::time::sleep(timeout).await;
71        Ok(None)
72    }
73
74    fn should_continue_as_new(&self) -> bool {
75        false
76    }
77
78    async fn continue_as_new(
79        &self,
80        _state: serde_json::Value,
81    ) -> Result<(), DurableError> {
82        // Local context: continue-as-new is a no-op
83        Ok(())
84    }
85
86    async fn sleep(&self, duration: Duration) {
87        tokio::time::sleep(duration).await;
88    }
89
90    fn now(&self) -> chrono::DateTime<chrono::Utc> {
91        chrono::Utc::now()
92    }
93}
94
95// =============================================================================
96// Production durable execution implementations
97// =============================================================================
98//
99// ## Temporal
100//
101// To implement `DurableContext` for Temporal, add `temporal-sdk` as a dependency
102// (feature-gated) and wrap the Temporal workflow context:
103//
104// ```ignore
105// use temporal_sdk::WfContext;
106//
107// pub struct TemporalDurableContext {
108//     ctx: WfContext,
109//     tools: Arc<ToolRegistry>,
110// }
111//
112// impl DurableContext for TemporalDurableContext {
113//     async fn execute_llm_call(
114//         &self,
115//         request: CompletionRequest,
116//         options: ActivityOptions,
117//     ) -> Result<CompletionResponse, DurableError> {
118//         let input = serde_json::to_string(&request)
119//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
120//         let result = self.ctx
121//             .activity(ActivityOptions {
122//                 activity_type: "llm_call".to_string(),
123//                 start_to_close_timeout: Some(options.start_to_close_timeout),
124//                 heartbeat_timeout: options.heartbeat_timeout,
125//                 retry_policy: options.retry_policy.map(convert_retry_policy),
126//                 input: vec![input.into()],
127//                 ..Default::default()
128//             })
129//             .await
130//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))?;
131//         serde_json::from_slice(&result.result)
132//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))
133//     }
134//
135//     // ... similar for execute_tool, wait_for_signal, etc.
136//
137//     fn should_continue_as_new(&self) -> bool {
138//         self.ctx.get_info().is_continue_as_new_suggested()
139//     }
140//
141//     async fn sleep(&self, duration: Duration) {
142//         self.ctx.timer(duration).await;
143//     }
144//
145//     fn now(&self) -> chrono::DateTime<chrono::Utc> {
146//         // Temporal provides deterministic time during replay
147//         self.ctx.workflow_time()
148//     }
149// }
150// ```
151//
152// ## Restate
153//
154// To implement `DurableContext` for Restate, add `restate-sdk` as a dependency
155// (feature-gated) and wrap the Restate context:
156//
157// ```ignore
158// use restate_sdk::context::Context;
159//
160// pub struct RestateDurableContext {
161//     ctx: Context,
162//     tools: Arc<ToolRegistry>,
163// }
164//
165// impl DurableContext for RestateDurableContext {
166//     async fn execute_llm_call(
167//         &self,
168//         request: CompletionRequest,
169//         _options: ActivityOptions,
170//     ) -> Result<CompletionResponse, DurableError> {
171//         self.ctx
172//             .run("llm_call", || async {
173//                 // Direct LLM call here — Restate journals the result
174//                 provider.complete(request).await
175//             })
176//             .await
177//             .map_err(|e| DurableError::ActivityFailed(e.to_string()))
178//     }
179//
180//     // ... similar for execute_tool, etc.
181//
182//     async fn sleep(&self, duration: Duration) {
183//         self.ctx.sleep(duration).await;
184//     }
185// }
186// ```