juncture_core/func/mod.rs
1//! Functional API for defining workflows with plain functions
2//!
3//! This module provides the functional entrypoint/task API as an alternative
4//! to [`StateGraph`](crate::graph::StateGraph). Users can define workflows
5//! using ordinary async functions with runtime context instead of manually
6//! building graphs.
7//!
8//! # Concepts
9//!
10//! - **Entrypoint functions** - Main workflow functions that can be compiled into graphs
11//! - **Task configuration** - Reusable functions with retry/cache/timeout policies
12//! - **`Runtime<S>`** - Provides access to previous state, checkpointer, and store
13//!
14//! # Architecture
15//!
16//! The functional API is a lightweight wrapper around `StateGraph`:
17//! - Entrypoint functions compile to single-node graphs
18//! - Task functions use [`TaskConfig`] for per-node configuration
19//! - [`Runtime<S>`] provides the same context as [`CoreRuntime`](crate::runtime::Runtime)
20//! with additional functional-API-specific features
21//!
22//! # Example
23//!
24//! ```ignore
25//! use juncture_core::func::{compile_entrypoint, Runtime};
26//! use juncture_core::checkpoint::MemorySaver;
27//! use juncture_core::state::CowState;
28//! use juncture_core::runtime::Runtime as CoreRuntime;
29//!
30//! // Define the workflow function
31//! async fn my_workflow(
32//! state: CowState<MyState>,
33//! runtime: &CoreRuntime<MyState>,
34//! ) -> Result<MyStateUpdate, JunctureError> {
35//! Ok(MyStateUpdate::default())
36//! }
37//!
38//! // Compile into a graph
39//! let graph = compile_entrypoint::<MyState, Input, Output, _>(
40//! my_workflow,
41//! Some(Arc::new(MemorySaver::new()))
42//! )?;
43//!
44//! // Execute
45//! let result = graph.invoke(input, &config).await?;
46//! ```
47
48use std::sync::Arc;
49
50use crate::checkpoint::CheckpointSaver;
51use crate::config::{EntrypointConfig, TaskConfig};
52use crate::graph::{StateGraph, TopologyError};
53use crate::node::IntoNode;
54use crate::runtime::Runtime as CoreRuntime;
55use crate::state::{FromState, IntoState, State};
56use crate::store::Store;
57
58/// Runtime context for functional API workflows
59///
60/// Provides access to previous execution state, checkpointing, and storage
61/// during workflow execution. This type extends [`CoreRuntime`] with
62/// functional-API-specific features like previous value access.
63///
64/// # Type Parameters
65///
66/// * `S` - State type (must implement [`State`] and [`Default`])
67///
68/// # Fields
69///
70/// - `previous` - Previous execution return value (for accumulation patterns)
71/// - `checkpointer` - Checkpoint saver for state persistence
72/// - `store` - Cross-thread persistent key-value store
73/// - `core` - Underlying core runtime for advanced use cases
74///
75/// # Examples
76///
77/// ## Accessing previous state
78///
79/// ```ignore
80/// use juncture_core::func::Runtime;
81///
82/// async fn accumulating_workflow(
83/// state: CowState<MyState>,
84/// runtime: &CoreRuntime<MyState>,
85/// ) -> Result<MyStateUpdate, JunctureError> {
86/// // Access the functional runtime
87/// let func_runtime = Runtime::from_core(runtime);
88///
89/// // Get the previous return value
90/// if let Some(previous) = &func_runtime.previous {
91/// let prev_output: Output = serde_json::from_value(previous.clone())
92/// .map_err(|e| JunctureError::execution(format!("Failed to deserialize previous: {}", e)))?;
93/// // Use previous value for accumulation
94/// }
95///
96/// Ok(MyStateUpdate::default())
97/// }
98/// ```
99///
100/// ## Using the store
101///
102/// ```ignore
103/// use juncture_core::func::Runtime;
104///
105/// async fn workflow_with_store(
106/// state: CowState<MyState>,
107/// runtime: &CoreRuntime<MyState>,
108/// ) -> Result<MyStateUpdate, JunctureError> {
109/// let func_runtime = Runtime::from_core(runtime);
110///
111/// if let Some(store) = &func_runtime.store {
112/// store.put("key", serde_json::json!("value")).await?;
113/// }
114///
115/// Ok(MyStateUpdate::default())
116/// }
117/// ```
118#[derive(Clone)]
119pub struct Runtime<S: State + Default> {
120 /// Previous execution return value (for accumulation patterns)
121 ///
122 /// When resuming from a checkpoint, this contains the return value from
123 /// the previous execution. For first-time execution, this is `None`.
124 pub previous: Option<serde_json::Value>,
125
126 /// Checkpoint saver for state persistence
127 ///
128 /// When set, the workflow can save and restore intermediate state.
129 pub checkpointer: Option<Arc<dyn CheckpointSaver>>,
130
131 /// Cross-thread persistent key-value store
132 ///
133 /// Provides durable storage that survives across workflow executions.
134 pub store: Option<Arc<dyn Store>>,
135
136 /// Underlying core runtime
137 ///
138 /// Provides access to advanced runtime features like heartbeat,
139 /// execution metadata, and streaming.
140 pub core: CoreRuntime<S>,
141}
142
143impl<S: State + Default + std::fmt::Debug> std::fmt::Debug for Runtime<S> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("Runtime")
146 .field("previous", &self.previous)
147 .field(
148 "checkpointer",
149 &self.checkpointer.as_ref().map(|_| "<CheckpointSaver>"),
150 )
151 .field("store", &self.store.as_ref().map(|_| "<Store>"))
152 .field("core", &self.core)
153 .finish()
154 }
155}
156
157impl<S: State + Default> Runtime<S> {
158 /// Create a new runtime with minimal configuration
159 #[must_use]
160 pub fn new() -> Self
161 where
162 S: Default,
163 {
164 Self {
165 previous: None,
166 checkpointer: None,
167 store: None,
168 core: CoreRuntime::new(),
169 }
170 }
171
172 /// Create a functional runtime from a core runtime
173 ///
174 /// This extracts functional-API-specific context from the core runtime,
175 /// allowing entrypoint functions to access features like previous values.
176 #[must_use]
177 pub fn from_core(core: &CoreRuntime<S>) -> Self {
178 Self {
179 previous: core.previous.clone(),
180 checkpointer: None,
181 store: core.store.clone(),
182 core: core.clone(),
183 }
184 }
185
186 /// Create a runtime from an entrypoint configuration
187 #[must_use]
188 pub fn from_entrypoint_config(config: &EntrypointConfig) -> Self
189 where
190 S: Default,
191 {
192 Self {
193 previous: None,
194 checkpointer: config.checkpointer.clone(),
195 store: config.store.clone(),
196 core: CoreRuntime::new(),
197 }
198 }
199
200 /// Set the previous execution value
201 #[must_use]
202 pub fn with_previous(mut self, previous: serde_json::Value) -> Self {
203 self.previous = Some(previous);
204 self
205 }
206
207 /// Set the checkpointer
208 #[must_use]
209 pub fn with_checkpointer(mut self, checkpointer: Arc<dyn CheckpointSaver>) -> Self {
210 self.checkpointer = Some(checkpointer);
211 self
212 }
213
214 /// Set the store
215 #[must_use]
216 pub fn with_store(mut self, store: Arc<dyn Store>) -> Self {
217 self.store = Some(store);
218 self
219 }
220
221 /// Set the core runtime
222 #[must_use]
223 pub fn with_core(mut self, core: CoreRuntime<S>) -> Self {
224 self.core = core;
225 self
226 }
227}
228
229impl<S: State + Default> Default for Runtime<S> {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235/// Compile a functional workflow entrypoint into an executable graph
236///
237/// This function wraps a simple async function in a [`StateGraph`] with a
238/// single entrypoint node, providing a functional API alternative to manual
239/// graph construction.
240///
241/// # Type Parameters
242///
243/// * `S` - State type
244/// * `I` - Input type (must implement [`IntoState<S>`])
245/// * `O` - Output type (must implement [`FromState<S>`])
246/// * `F` - Function type (must implement [`IntoNode<S>`])
247///
248/// # Parameters
249///
250/// - `func` - The entrypoint function to compile
251/// - `checkpointer` - Optional checkpoint saver for state persistence
252///
253/// # Returns
254///
255/// A compiled graph that can be invoked with [`CompiledGraph::invoke`](crate::graph::CompiledGraph::invoke)
256/// or streamed with [`CompiledGraph::stream`](crate::graph::CompiledGraph::stream).
257///
258/// # Errors
259///
260/// Returns [`TopologyError`] if:
261/// - The function cannot be converted into a node
262/// - The graph structure is invalid
263///
264/// # Examples
265///
266/// ## Basic usage
267///
268/// ```ignore
269/// use juncture_core::func::compile_entrypoint;
270/// use juncture_core::checkpoint::MemorySaver;
271/// use juncture_core::state::CowState;
272/// use juncture_core::runtime::Runtime as CoreRuntime;
273/// use juncture_core::JunctureError;
274///
275/// async fn my_workflow(
276/// state: CowState<MyState>,
277/// runtime: &CoreRuntime<MyState>,
278/// ) -> Result<MyStateUpdate, JunctureError> {
279/// Ok(MyStateUpdate::default())
280/// }
281///
282/// let graph = compile_entrypoint::<MyState, Input, Output, _>(
283/// my_workflow,
284/// Some(Arc::new(MemorySaver::new()))
285/// )?;
286///
287/// let result = graph.invoke(input, &config).await?;
288/// ```
289///
290/// ## With task configuration
291///
292/// ```ignore
293/// use juncture_core::func::compile_entrypoint_with_config;
294/// use juncture_core::config::TaskConfig;
295/// use juncture_core::graph::{RetryPolicy, NodeMetadata};
296/// use std::time::Duration;
297///
298/// let retry_policy = RetryPolicy::max_attempts(3);
299/// let task_config = TaskConfig {
300/// retry_policy: Some(retry_policy.clone()),
301/// cache_policy: None,
302/// timeout: Some(Duration::from_secs(30)),
303/// name: Some("my_workflow".to_string()),
304/// };
305///
306/// let graph = compile_entrypoint_with_config(
307/// my_workflow,
308/// &task_config,
309/// Some(Arc::new(MemorySaver::new()))
310/// )?;
311/// ```
312pub fn compile_entrypoint<S: State + Default, I, O, F>(
313 func: F,
314 checkpointer: Option<Arc<dyn CheckpointSaver>>,
315) -> Result<crate::graph::CompiledGraph<S, I, O>, TopologyError>
316where
317 F: IntoNode<S>,
318 I: IntoState<S>,
319 O: FromState<S>,
320{
321 compile_entrypoint_with_config(func, &TaskConfig::default(), checkpointer)
322}
323
324/// Compile a functional workflow entrypoint with task configuration
325///
326/// This is an extended version of [`compile_entrypoint`] that allows specifying
327/// task-level configuration like retry policies, caching, and timeouts.
328///
329/// # Type Parameters
330///
331/// * `S` - State type
332/// * `I` - Input type (must implement [`IntoState<S>`])
333/// * `O` - Output type (must implement [`FromState<S>`])
334/// * `F` - Function type (must implement [`IntoNode<S>`])
335///
336/// # Parameters
337///
338/// - `func` - The entrypoint function to compile
339/// - `config` - Task configuration for the entrypoint node
340/// - `checkpointer` - Optional checkpoint saver for state persistence
341///
342/// # Returns
343///
344/// A compiled graph with the entrypoint node configured according to `config`.
345///
346/// # Errors
347///
348/// Returns [`TopologyError`] if:
349/// - The function cannot be converted into a node
350/// - The graph structure is invalid
351pub fn compile_entrypoint_with_config<S: State + Default, I, O, F>(
352 func: F,
353 config: &TaskConfig,
354 checkpointer: Option<Arc<dyn CheckpointSaver>>,
355) -> Result<crate::graph::CompiledGraph<S, I, O>, TopologyError>
356where
357 F: IntoNode<S>,
358 I: IntoState<S>,
359 O: FromState<S>,
360{
361 let entrypoint_name = config
362 .name
363 .clone()
364 .unwrap_or_else(|| "__entrypoint__".to_string());
365
366 let retry_policies = config
367 .retry_policy
368 .as_ref()
369 .map(|p| vec![p.clone()])
370 .unwrap_or_default();
371
372 let mut graph = StateGraph::<S, I, O>::new();
373
374 graph.add_node(
375 &entrypoint_name,
376 func,
377 false,
378 None,
379 None,
380 retry_policies,
381 Vec::new(),
382 )?;
383
384 graph.set_entry_point(&entrypoint_name);
385 graph.set_finish_point(&entrypoint_name);
386
387 graph.compile_with_checkpointer(checkpointer)
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::JunctureError;
394 use crate::node::NodeFnUpdate;
395 use crate::state::MessagesState;
396
397 type TestState = MessagesState;
398 type TestStateUpdate = <TestState as State>::Update;
399
400 #[test]
401 fn test_runtime_new() {
402 let runtime = Runtime::<TestState>::new();
403 assert!(runtime.previous.is_none());
404 assert!(runtime.checkpointer.is_none());
405 assert!(runtime.store.is_none());
406 }
407
408 #[test]
409 fn test_runtime_default() {
410 let runtime = Runtime::<TestState>::default();
411 assert!(runtime.previous.is_none());
412 assert!(runtime.checkpointer.is_none());
413 assert!(runtime.store.is_none());
414 }
415
416 #[test]
417 fn test_runtime_with_previous() {
418 let previous = serde_json::json!("previous_value");
419 let runtime = Runtime::<TestState>::new().with_previous(previous.clone());
420 assert_eq!(runtime.previous, Some(previous));
421 }
422
423 #[test]
424 fn test_runtime_from_entrypoint_config() {
425 let config = EntrypointConfig {
426 checkpointer: None,
427 store: None,
428 };
429 let runtime = Runtime::<TestState>::from_entrypoint_config(&config);
430 assert!(runtime.checkpointer.is_none());
431 assert!(runtime.store.is_none());
432 }
433
434 #[test]
435 fn test_runtime_clone() {
436 let runtime = Runtime::<TestState>::new();
437 let _cloned = runtime.clone();
438 assert!(runtime.previous.is_none());
439 assert!(runtime.checkpointer.is_none());
440 }
441
442 #[test]
443 fn test_compile_entrypoint_basic() {
444 let result = compile_entrypoint::<TestState, TestState, TestState, _>(
445 NodeFnUpdate(|_state: &TestState| async {
446 Ok::<TestStateUpdate, JunctureError>(TestStateUpdate::default())
447 }),
448 None,
449 );
450 result.unwrap();
451 }
452
453 #[test]
454 fn test_compile_entrypoint_with_config() {
455 let retry_policy = crate::graph::RetryPolicy {
456 max_attempts: 3,
457 ..Default::default()
458 };
459 let config = TaskConfig {
460 retry_policy: Some(retry_policy),
461 cache_policy: None,
462 timeout: None,
463 name: Some("custom_entrypoint".to_string()),
464 };
465
466 let result = compile_entrypoint_with_config::<TestState, TestState, TestState, _>(
467 NodeFnUpdate(|_state: &TestState| async {
468 Ok::<TestStateUpdate, JunctureError>(TestStateUpdate::default())
469 }),
470 &config,
471 None,
472 );
473 result.unwrap();
474 }
475}
476
477// Rust guideline compliant 2026-05-23