Skip to main content

langgraph_core_rs/runnable/
callable.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use langgraph_checkpoint::config::RunnableConfig;
7use super::base::{Runnable, RunnableError};
8
9/// A clonable async function: (input, config) -> future(result).
10///
11/// Uses `Arc` so the function can be cloned and shared across threads.
12pub type BoxedFn = Arc<
13    dyn Fn(
14            JsonValue,
15            RunnableConfig,
16        ) -> Pin<Box<dyn Future<Output = Result<JsonValue, RunnableError>> + Send>>
17        + Send
18        + Sync,
19>;
20
21/// Wraps a function as a `Runnable`.
22///
23/// This is the Rust equivalent of Python's `RunnableCallable`.
24/// Functions always receive `(JsonValue, RunnableConfig)` — the caller
25/// is responsible for marshalling state into `JsonValue`.
26pub struct RunnableCallable {
27    name: String,
28    func: BoxedFn,
29}
30
31impl RunnableCallable {
32    /// Create from an async function.
33    pub fn new<F, Fut>(name: impl Into<String>, f: F) -> Self
34    where
35        F: Fn(JsonValue, RunnableConfig) -> Fut + Send + Sync + 'static,
36        Fut: Future<Output = Result<JsonValue, RunnableError>> + Send + 'static,
37    {
38        Self {
39            name: name.into(),
40            func: Arc::new(move |input, config| Box::pin(f(input, config))),
41        }
42    }
43
44    /// Create from a sync function (wrapped as async).
45    pub fn new_sync<F>(name: impl Into<String>, f: F) -> Self
46    where
47        F: Fn(&JsonValue, &RunnableConfig) -> Result<JsonValue, RunnableError> + Send + Sync + 'static,
48    {
49        let f = Arc::new(f);
50        Self {
51            name: name.into(),
52            func: Arc::new(move |input, config| {
53                let f = f.clone();
54                Box::pin(async move { f(&input, &config) })
55            }),
56        }
57    }
58}
59
60#[async_trait]
61impl Runnable for RunnableCallable {
62    fn invoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
63        let func = self.func.clone();
64        let input = input.clone();
65        let config = config.clone();
66
67        // Try to use existing tokio runtime, otherwise create one
68        match tokio::runtime::Handle::try_current() {
69            Ok(handle) => handle.block_on(crate::config::with_config(config.clone(), func(input, config))),
70            Err(_) => tokio::runtime::Runtime::new()
71                .unwrap()
72                .block_on(func(input, config)),
73        }
74    }
75
76    async fn ainvoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
77        let func = self.func.clone();
78        let input = input.clone();
79        let config_inner = config.clone();
80        crate::config::with_config(config.clone(), func(input, config_inner)).await
81    }
82
83    fn name(&self) -> &str {
84        &self.name
85    }
86}