Skip to main content

synwire_core/runnables/
core.rs

1//! Core trait for all runnable components.
2//!
3//! # Design Decision: `serde_json::Value` as Universal I/O
4//!
5//! `RunnableCore` uses `serde_json::Value` for input/output rather than generic
6//! type parameters (`RunnableCore<I, O>`) because:
7//!
8//! - **Object safety**: traits with generic type parameters cannot be easily
9//!   stored in `Vec<Box<dyn RunnableCore>>` for heterogeneous chains.
10//! - **Composability**: any runnable can be chained with any other without
11//!   explicit type conversion boilerplate.
12//! - **Trade-off**: runtime type checking instead of compile-time, but this
13//!   matches `LangChain` Python's dynamic typing model.
14//! - **Alternative considered**: `Cow<'_, Value>` for zero-copy reads, but the
15//!   ergonomic cost was deemed too high for the typical use case.
16
17use crate::error::SynwireError;
18use crate::runnables::config::RunnableConfig;
19use crate::{BoxFuture, BoxStream};
20
21/// Core trait for all runnable components in a chain.
22///
23/// Uses `serde_json::Value` as the universal input/output type
24/// for composability between heterogeneous runnables.
25///
26/// # Default implementations
27///
28/// - [`batch`](RunnableCore::batch) invokes sequentially over each input.
29/// - [`stream`](RunnableCore::stream) wraps [`invoke`](RunnableCore::invoke)
30///   as a single-item stream.
31///
32/// # Lifetime parameter
33///
34/// All async methods share a single lifetime `'a` that ties together
35/// `&'a self` and `Option<&'a RunnableConfig>`, ensuring the returned
36/// future can borrow both.
37///
38/// # Cancel safety
39///
40/// [`invoke`](Self::invoke) and [`batch`](Self::batch) are **not
41/// cancel-safe** in general -- the behaviour depends on the concrete
42/// implementation. Dropping `batch` mid-execution may leave earlier
43/// inputs processed but later inputs unprocessed. The [`BoxStream`]
44/// returned by [`stream`](Self::stream) can be safely dropped at any
45/// point.
46pub trait RunnableCore: Send + Sync {
47    /// Invoke the runnable with a single input.
48    fn invoke<'a>(
49        &'a self,
50        input: serde_json::Value,
51        config: Option<&'a RunnableConfig>,
52    ) -> BoxFuture<'a, Result<serde_json::Value, SynwireError>>;
53
54    /// Invoke on multiple inputs. Default implementation calls
55    /// [`invoke`](RunnableCore::invoke) sequentially for each input.
56    fn batch<'a>(
57        &'a self,
58        inputs: Vec<serde_json::Value>,
59        config: Option<&'a RunnableConfig>,
60    ) -> BoxFuture<'a, Result<Vec<serde_json::Value>, SynwireError>> {
61        Box::pin(async move {
62            let mut results = Vec::with_capacity(inputs.len());
63            for input in inputs {
64                results.push(self.invoke(input, config).await?);
65            }
66            Ok(results)
67        })
68    }
69
70    /// Stream results. Default implementation wraps [`invoke`](RunnableCore::invoke)
71    /// as a single-item stream.
72    fn stream<'a>(
73        &'a self,
74        input: serde_json::Value,
75        config: Option<&'a RunnableConfig>,
76    ) -> BoxFuture<'a, Result<BoxStream<'a, Result<serde_json::Value, SynwireError>>, SynwireError>>
77    {
78        Box::pin(async move {
79            let result = self.invoke(input, config).await;
80            let stream: BoxStream<'a, Result<serde_json::Value, SynwireError>> =
81                Box::pin(futures_util::stream::iter(vec![result]));
82            Ok(stream)
83        })
84    }
85
86    /// Get the runnable's name for debugging.
87    #[allow(clippy::unnecessary_literal_bound)]
88    fn name(&self) -> &str {
89        "RunnableCore"
90    }
91}