Skip to main content

juncture_core/func/
mod.rs

1//! Functional API for defining workflows with plain functions
2//!
3//! This module provides the functional entrypoint/task API as an alternative
4//! to [`StateGraph`](crate::graph::StateGraph). Users can define workflows
5//! using ordinary async functions with runtime context instead of manually
6//! building graphs.
7//!
8//! # Concepts
9//!
10//! - **Entrypoint functions** - Main workflow functions that can be compiled into graphs
11//! - **Task configuration** - Reusable functions with retry/cache/timeout policies
12//! - **`Runtime<S>`** - Provides access to previous state, checkpointer, and store
13//!
14//! # Architecture
15//!
16//! The functional API is a lightweight wrapper around `StateGraph`:
17//! - Entrypoint functions compile to single-node graphs
18//! - Task functions use [`TaskConfig`] for per-node configuration
19//! - [`Runtime<S>`] provides the same context as [`CoreRuntime`](crate::runtime::Runtime)
20//!   with additional functional-API-specific features
21//!
22//! # Example
23//!
24//! ```ignore
25//! use juncture_core::func::{compile_entrypoint, Runtime};
26//! use juncture_core::checkpoint::MemorySaver;
27//! use juncture_core::state::CowState;
28//! use juncture_core::runtime::Runtime as CoreRuntime;
29//!
30//! // Define the workflow function
31//! async fn my_workflow(
32//!     state: CowState<MyState>,
33//!     runtime: &CoreRuntime<MyState>,
34//! ) -> Result<MyStateUpdate, JunctureError> {
35//!     Ok(MyStateUpdate::default())
36//! }
37//!
38//! // Compile into a graph
39//! let graph = compile_entrypoint::<MyState, Input, Output, _>(
40//!     my_workflow,
41//!     Some(Arc::new(MemorySaver::new()))
42//! )?;
43//!
44//! // Execute
45//! let result = graph.invoke(input, &config).await?;
46//! ```
47
48use std::sync::Arc;
49
50use crate::checkpoint::CheckpointSaver;
51use crate::config::{EntrypointConfig, TaskConfig};
52use crate::graph::{StateGraph, TopologyError};
53use crate::node::IntoNode;
54use crate::runtime::Runtime as CoreRuntime;
55use crate::state::{FromState, IntoState, State};
56use crate::store::Store;
57
58/// Runtime context for functional API workflows
59///
60/// Provides access to previous execution state, checkpointing, and storage
61/// during workflow execution. This type extends [`CoreRuntime`] with
62/// functional-API-specific features like previous value access.
63///
64/// # Type Parameters
65///
66/// * `S` - State type (must implement [`State`] and [`Default`])
67///
68/// # Fields
69///
70/// - `previous` - Previous execution return value (for accumulation patterns)
71/// - `checkpointer` - Checkpoint saver for state persistence
72/// - `store` - Cross-thread persistent key-value store
73/// - `core` - Underlying core runtime for advanced use cases
74///
75/// # Examples
76///
77/// ## Accessing previous state
78///
79/// ```ignore
80/// use juncture_core::func::Runtime;
81///
82/// async fn accumulating_workflow(
83///     state: CowState<MyState>,
84///     runtime: &CoreRuntime<MyState>,
85/// ) -> Result<MyStateUpdate, JunctureError> {
86///     // Access the functional runtime
87///     let func_runtime = Runtime::from_core(runtime);
88///
89///     // Get the previous return value
90///     if let Some(previous) = &func_runtime.previous {
91///         let prev_output: Output = serde_json::from_value(previous.clone())
92///             .map_err(|e| JunctureError::execution(format!("Failed to deserialize previous: {}", e)))?;
93///         // Use previous value for accumulation
94///     }
95///
96///     Ok(MyStateUpdate::default())
97/// }
98/// ```
99///
100/// ## Using the store
101///
102/// ```ignore
103/// use juncture_core::func::Runtime;
104///
105/// async fn workflow_with_store(
106///     state: CowState<MyState>,
107///     runtime: &CoreRuntime<MyState>,
108/// ) -> Result<MyStateUpdate, JunctureError> {
109///     let func_runtime = Runtime::from_core(runtime);
110///
111///     if let Some(store) = &func_runtime.store {
112///         store.put("key", serde_json::json!("value")).await?;
113///     }
114///
115///     Ok(MyStateUpdate::default())
116/// }
117/// ```
118#[derive(Clone)]
119pub struct Runtime<S: State + Default> {
120    /// Previous execution return value (for accumulation patterns)
121    ///
122    /// When resuming from a checkpoint, this contains the return value from
123    /// the previous execution. For first-time execution, this is `None`.
124    pub previous: Option<serde_json::Value>,
125
126    /// Checkpoint saver for state persistence
127    ///
128    /// When set, the workflow can save and restore intermediate state.
129    pub checkpointer: Option<Arc<dyn CheckpointSaver>>,
130
131    /// Cross-thread persistent key-value store
132    ///
133    /// Provides durable storage that survives across workflow executions.
134    pub store: Option<Arc<dyn Store>>,
135
136    /// Underlying core runtime
137    ///
138    /// Provides access to advanced runtime features like heartbeat,
139    /// execution metadata, and streaming.
140    pub core: CoreRuntime<S>,
141}
142
143impl<S: State + Default + std::fmt::Debug> std::fmt::Debug for Runtime<S> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("Runtime")
146            .field("previous", &self.previous)
147            .field(
148                "checkpointer",
149                &self.checkpointer.as_ref().map(|_| "<CheckpointSaver>"),
150            )
151            .field("store", &self.store.as_ref().map(|_| "<Store>"))
152            .field("core", &self.core)
153            .finish()
154    }
155}
156
157impl<S: State + Default> Runtime<S> {
158    /// Create a new runtime with minimal configuration
159    #[must_use]
160    pub fn new() -> Self
161    where
162        S: Default,
163    {
164        Self {
165            previous: None,
166            checkpointer: None,
167            store: None,
168            core: CoreRuntime::new(),
169        }
170    }
171
172    /// Create a functional runtime from a core runtime
173    ///
174    /// This extracts functional-API-specific context from the core runtime,
175    /// allowing entrypoint functions to access features like previous values.
176    #[must_use]
177    pub fn from_core(core: &CoreRuntime<S>) -> Self {
178        Self {
179            previous: core.previous.clone(),
180            checkpointer: None,
181            store: core.store.clone(),
182            core: core.clone(),
183        }
184    }
185
186    /// Create a runtime from an entrypoint configuration
187    #[must_use]
188    pub fn from_entrypoint_config(config: &EntrypointConfig) -> Self
189    where
190        S: Default,
191    {
192        Self {
193            previous: None,
194            checkpointer: config.checkpointer.clone(),
195            store: config.store.clone(),
196            core: CoreRuntime::new(),
197        }
198    }
199
200    /// Set the previous execution value
201    #[must_use]
202    pub fn with_previous(mut self, previous: serde_json::Value) -> Self {
203        self.previous = Some(previous);
204        self
205    }
206
207    /// Set the checkpointer
208    #[must_use]
209    pub fn with_checkpointer(mut self, checkpointer: Arc<dyn CheckpointSaver>) -> Self {
210        self.checkpointer = Some(checkpointer);
211        self
212    }
213
214    /// Set the store
215    #[must_use]
216    pub fn with_store(mut self, store: Arc<dyn Store>) -> Self {
217        self.store = Some(store);
218        self
219    }
220
221    /// Set the core runtime
222    #[must_use]
223    pub fn with_core(mut self, core: CoreRuntime<S>) -> Self {
224        self.core = core;
225        self
226    }
227}
228
229impl<S: State + Default> Default for Runtime<S> {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235/// Compile a functional workflow entrypoint into an executable graph
236///
237/// This function wraps a simple async function in a [`StateGraph`] with a
238/// single entrypoint node, providing a functional API alternative to manual
239/// graph construction.
240///
241/// # Type Parameters
242///
243/// * `S` - State type
244/// * `I` - Input type (must implement [`IntoState<S>`])
245/// * `O` - Output type (must implement [`FromState<S>`])
246/// * `F` - Function type (must implement [`IntoNode<S>`])
247///
248/// # Parameters
249///
250/// - `func` - The entrypoint function to compile
251/// - `checkpointer` - Optional checkpoint saver for state persistence
252///
253/// # Returns
254///
255/// A compiled graph that can be invoked with [`CompiledGraph::invoke`](crate::graph::CompiledGraph::invoke)
256/// or streamed with [`CompiledGraph::stream`](crate::graph::CompiledGraph::stream).
257///
258/// # Errors
259///
260/// Returns [`TopologyError`] if:
261/// - The function cannot be converted into a node
262/// - The graph structure is invalid
263///
264/// # Examples
265///
266/// ## Basic usage
267///
268/// ```ignore
269/// use juncture_core::func::compile_entrypoint;
270/// use juncture_core::checkpoint::MemorySaver;
271/// use juncture_core::state::CowState;
272/// use juncture_core::runtime::Runtime as CoreRuntime;
273/// use juncture_core::JunctureError;
274///
275/// async fn my_workflow(
276///     state: CowState<MyState>,
277///     runtime: &CoreRuntime<MyState>,
278/// ) -> Result<MyStateUpdate, JunctureError> {
279///     Ok(MyStateUpdate::default())
280/// }
281///
282/// let graph = compile_entrypoint::<MyState, Input, Output, _>(
283///     my_workflow,
284///     Some(Arc::new(MemorySaver::new()))
285/// )?;
286///
287/// let result = graph.invoke(input, &config).await?;
288/// ```
289///
290/// ## With task configuration
291///
292/// ```ignore
293/// use juncture_core::func::compile_entrypoint_with_config;
294/// use juncture_core::config::TaskConfig;
295/// use juncture_core::graph::{RetryPolicy, NodeMetadata};
296/// use std::time::Duration;
297///
298/// let retry_policy = RetryPolicy::max_attempts(3);
299/// let task_config = TaskConfig {
300///     retry_policy: Some(retry_policy.clone()),
301///     cache_policy: None,
302///     timeout: Some(Duration::from_secs(30)),
303///     name: Some("my_workflow".to_string()),
304/// };
305///
306/// let graph = compile_entrypoint_with_config(
307///     my_workflow,
308///     &task_config,
309///     Some(Arc::new(MemorySaver::new()))
310/// )?;
311/// ```
312pub fn compile_entrypoint<S: State + Default, I, O, F>(
313    func: F,
314    checkpointer: Option<Arc<dyn CheckpointSaver>>,
315) -> Result<crate::graph::CompiledGraph<S, I, O>, TopologyError>
316where
317    F: IntoNode<S>,
318    I: IntoState<S>,
319    O: FromState<S>,
320{
321    compile_entrypoint_with_config(func, &TaskConfig::default(), checkpointer)
322}
323
324/// Compile a functional workflow entrypoint with task configuration
325///
326/// This is an extended version of [`compile_entrypoint`] that allows specifying
327/// task-level configuration like retry policies, caching, and timeouts.
328///
329/// # Type Parameters
330///
331/// * `S` - State type
332/// * `I` - Input type (must implement [`IntoState<S>`])
333/// * `O` - Output type (must implement [`FromState<S>`])
334/// * `F` - Function type (must implement [`IntoNode<S>`])
335///
336/// # Parameters
337///
338/// - `func` - The entrypoint function to compile
339/// - `config` - Task configuration for the entrypoint node
340/// - `checkpointer` - Optional checkpoint saver for state persistence
341///
342/// # Returns
343///
344/// A compiled graph with the entrypoint node configured according to `config`.
345///
346/// # Errors
347///
348/// Returns [`TopologyError`] if:
349/// - The function cannot be converted into a node
350/// - The graph structure is invalid
351pub fn compile_entrypoint_with_config<S: State + Default, I, O, F>(
352    func: F,
353    config: &TaskConfig,
354    checkpointer: Option<Arc<dyn CheckpointSaver>>,
355) -> Result<crate::graph::CompiledGraph<S, I, O>, TopologyError>
356where
357    F: IntoNode<S>,
358    I: IntoState<S>,
359    O: FromState<S>,
360{
361    let entrypoint_name = config
362        .name
363        .clone()
364        .unwrap_or_else(|| "__entrypoint__".to_string());
365
366    let retry_policies = config
367        .retry_policy
368        .as_ref()
369        .map(|p| vec![p.clone()])
370        .unwrap_or_default();
371
372    let mut graph = StateGraph::<S, I, O>::new();
373
374    graph.add_node(
375        &entrypoint_name,
376        func,
377        false,
378        None,
379        None,
380        retry_policies,
381        Vec::new(),
382    )?;
383
384    graph.set_entry_point(&entrypoint_name);
385    graph.set_finish_point(&entrypoint_name);
386
387    graph.compile_with_checkpointer(checkpointer)
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::JunctureError;
394    use crate::node::NodeFnUpdate;
395    use crate::state::MessagesState;
396
397    type TestState = MessagesState;
398    type TestStateUpdate = <TestState as State>::Update;
399
400    #[test]
401    fn test_runtime_new() {
402        let runtime = Runtime::<TestState>::new();
403        assert!(runtime.previous.is_none());
404        assert!(runtime.checkpointer.is_none());
405        assert!(runtime.store.is_none());
406    }
407
408    #[test]
409    fn test_runtime_default() {
410        let runtime = Runtime::<TestState>::default();
411        assert!(runtime.previous.is_none());
412        assert!(runtime.checkpointer.is_none());
413        assert!(runtime.store.is_none());
414    }
415
416    #[test]
417    fn test_runtime_with_previous() {
418        let previous = serde_json::json!("previous_value");
419        let runtime = Runtime::<TestState>::new().with_previous(previous.clone());
420        assert_eq!(runtime.previous, Some(previous));
421    }
422
423    #[test]
424    fn test_runtime_from_entrypoint_config() {
425        let config = EntrypointConfig {
426            checkpointer: None,
427            store: None,
428        };
429        let runtime = Runtime::<TestState>::from_entrypoint_config(&config);
430        assert!(runtime.checkpointer.is_none());
431        assert!(runtime.store.is_none());
432    }
433
434    #[test]
435    fn test_runtime_clone() {
436        let runtime = Runtime::<TestState>::new();
437        let _cloned = runtime.clone();
438        assert!(runtime.previous.is_none());
439        assert!(runtime.checkpointer.is_none());
440    }
441
442    #[test]
443    fn test_compile_entrypoint_basic() {
444        let result = compile_entrypoint::<TestState, TestState, TestState, _>(
445            NodeFnUpdate(|_state: &TestState| async {
446                Ok::<TestStateUpdate, JunctureError>(TestStateUpdate::default())
447            }),
448            None,
449        );
450        result.unwrap();
451    }
452
453    #[test]
454    fn test_compile_entrypoint_with_config() {
455        let retry_policy = crate::graph::RetryPolicy {
456            max_attempts: 3,
457            ..Default::default()
458        };
459        let config = TaskConfig {
460            retry_policy: Some(retry_policy),
461            cache_policy: None,
462            timeout: None,
463            name: Some("custom_entrypoint".to_string()),
464        };
465
466        let result = compile_entrypoint_with_config::<TestState, TestState, TestState, _>(
467            NodeFnUpdate(|_state: &TestState| async {
468                Ok::<TestStateUpdate, JunctureError>(TestStateUpdate::default())
469            }),
470            &config,
471            None,
472        );
473        result.unwrap();
474    }
475}
476
477// Rust guideline compliant 2026-05-23