Skip to main content

wesichain_core/
binding.rs

1use std::marker::PhantomData;
2
3use crate::{Runnable, StreamEvent, WesichainError};
4use async_trait::async_trait;
5use futures::stream::BoxStream;
6
7use futures::StreamExt;
8
9/// A trait for input types that can have arguments bound to them.
10pub trait Bindable: Sized + Send + Sync + 'static {
11    /// Bind arguments to the input.
12    /// The `args` should be a `crate::Value` (typically a JSON object).
13    fn bind(&mut self, args: crate::Value) -> Result<(), WesichainError>;
14}
15
16/// A Runnable that has arguments bound to it.
17pub struct RunnableBinding<R, Input, Output> {
18    pub(crate) bound: R,
19    pub(crate) args: crate::Value,
20    pub(crate) _marker: PhantomData<(Input, Output)>,
21}
22
23impl<R, Input, Output> RunnableBinding<R, Input, Output> {
24    pub fn new(bound: R, args: crate::Value) -> Self {
25        Self {
26            bound,
27            args,
28            _marker: PhantomData,
29        }
30    }
31}
32
33#[async_trait]
34impl<R, Input, Output> Runnable<Input, Output> for RunnableBinding<R, Input, Output>
35where
36    R: Runnable<Input, Output> + Send + Sync,
37    Input: Bindable + Clone + Send + 'static,
38    Output: Send + Sync + 'static,
39{
40    async fn invoke(&self, mut input: Input) -> Result<Output, WesichainError> {
41        input.bind(self.args.clone())?;
42        self.bound.invoke(input).await
43    }
44
45    fn stream(&self, mut input: Input) -> BoxStream<'_, Result<StreamEvent, WesichainError>> {
46        // We can't return an error easily from stream establishment, so we log or panic if bind fails?
47        // Better: stream should probably return a stream that starts with an error if bind fails.
48        // For now, let's assume bind logic is simple enough or handle it inside the future stream if possible.
49        // But `stream` returns a BoxStream immediately.
50        // The `Input` is consumed.
51
52        // Correct approach: we need to handle the bind error.
53        // Since stream signature doesn't return Result implementation-wise, we might need adjustments.
54        // However, for now let's try to bind and if it errors, return a stream of one error.
55
56        if let Err(e) = input.bind(self.args.clone()) {
57            return futures::stream::once(async move { Err(e) }).boxed();
58        }
59
60        self.bound.stream(input)
61    }
62}