flow_lib/command/
builder.rs

1//! Helper for building command from node-definition files.
2//!
3//! Node-definition files are JSON files matching the [`Definition`] struct.
4//!
5//! # Example
6//!
7//! A command that adds 2 numbers:
8//! ```
9//! use flow_lib::command::{*, prelude::*, builder::*};
10//!
11//! inventory::submit!(CommandDescription::new("add", |_| build()));
12//!
13//! const DEFINITION: &str = r#"
14//! {
15//!   "type": "native",
16//!   "data": {
17//!     "node_id": "add"
18//!   },
19//!   "sources": [
20//!     {
21//!       "name": "result",
22//!       "type": "i64"
23//!     }
24//!   ],
25//!   "targets": [
26//!     {
27//!       "name": "a",
28//!       "type_bounds": ["i64"],
29//!       "required": true,
30//!       "passthrough": false
31//!     },
32//!     {
33//!       "name": "b",
34//!       "type_bounds": ["i64"],
35//!       "required": true,
36//!       "passthrough": false
37//!     }
38//!   ]
39//! }
40//! "#;
41//!
42//! fn build() -> BuildResult {
43//!     static CACHE: BuilderCache = BuilderCache::new(|| {
44//!         CmdBuilder::new(DEFINITION)?
45//!             .check_name("add")
46//!     });
47//!     Ok(CACHE.clone()?.build(run))
48//! }
49//!
50//! #[derive(serde::Deserialize, Debug)]
51//! struct Input {
52//!     a: i64,
53//!     b: i64,
54//! }
55//!
56//! #[derive(serde::Serialize, Debug)]
57//! struct Output {
58//!     result: i64,
59//! }
60//!
61//! async fn run(_: CommandContext, input: Input) -> Result<Output, CommandError> {
62//!     Ok(Output { result: input.a + input.b })
63//! }
64//! ```
65
66use super::{CommandError, CommandTrait, FnNewResult};
67use crate::{
68    Name,
69    command::InstructionInfo,
70    config::node::{Definition, Permissions},
71    context::CommandContext,
72    utils::LocalBoxFuture,
73};
74use serde::{Serialize, de::DeserializeOwned};
75use std::{future::Future, sync::LazyLock};
76use thiserror::Error as ThisError;
77
78/// `fn build() -> BuildResult`.
79pub type BuildResult = FnNewResult;
80
81/// Use this to cache computation such as parsing JSON node-definition.
82pub type BuilderCache = LazyLock<Result<CmdBuilder, BuilderError>>;
83
84/// Create a command from node-definition file and an `async fn run()` function.
85#[derive(Clone)]
86pub struct CmdBuilder {
87    def: Definition,
88    signature_name: Option<String>,
89}
90
91#[derive(ThisError, Debug, Clone)]
92pub enum BuilderError {
93    #[error("{0}")]
94    Json(String),
95    #[error("wrong command name: {0}")]
96    WrongName(String),
97    #[error("output not found: {0}")]
98    OutputNotFound(String),
99}
100
101impl From<serde_json::Error> for BuilderError {
102    fn from(value: serde_json::Error) -> Self {
103        BuilderError::Json(value.to_string())
104    }
105}
106
107impl CmdBuilder {
108    /// Start building command with a JSON node-definition.
109    /// Most of the time you would use [`include_str`] to get the file content and pass to this.
110    pub fn new(def: &str) -> Result<Self, serde_json::Error> {
111        let def = serde_json::from_str(def)?;
112        Ok(Self {
113            def,
114            signature_name: None,
115        })
116    }
117
118    /// Check that the command name in node-definition is equal to this name, to prevent accidentally
119    /// using the wrong node-definition.
120    pub fn check_name(self, name: &str) -> Result<Self, BuilderError> {
121        if self.def.data.node_id == name {
122            Ok(self)
123        } else {
124            Err(BuilderError::WrongName(self.def.data.node_id))
125        }
126    }
127
128    /// Set permissions of the command.
129    pub fn permissions(mut self, p: Permissions) -> Self {
130        self.def.permissions = p;
131        self
132    }
133
134    /// Use an [`InstructionInfo::simple`] for this command.
135    pub fn simple_instruction_info(mut self, signature_name: &str) -> Result<Self, BuilderError> {
136        if self.def.sources.iter().any(|x| x.name == signature_name) {
137            self.signature_name = Some(signature_name.to_owned());
138            Ok(self)
139        } else {
140            Err(BuilderError::OutputNotFound(signature_name.to_owned()))
141        }
142    }
143
144    /// Build the command, `f` will be used as this command's [`fn run()`][CommandTrait::run].
145    ///
146    /// - `f` must be an `async fn(Context, Input) -> Result<Output, CommandError>`.
147    /// - `Input` must implement [`DeserializeOwned`].
148    /// - `Output` must implement [`Serialize`].
149    pub fn build<T, U, Fut, F>(self, f: F) -> Box<dyn CommandTrait>
150    where
151        F: Fn(CommandContext, T) -> Fut + Send + Sync + 'static,
152        Fut: Future<Output = Result<U, CommandError>> + Send + 'static,
153        T: DeserializeOwned + 'static,
154        U: Serialize,
155    {
156        struct Command<T, Fut> {
157            name: Name,
158            inputs: Vec<crate::CmdInputDescription>,
159            outputs: Vec<crate::CmdOutputDescription>,
160            instruction_info: Option<InstructionInfo>,
161            permissions: Permissions,
162            run: Box<dyn Fn(CommandContext, T) -> Fut + Send + Sync + 'static>,
163        }
164
165        impl<T, U, Fut> CommandTrait for Command<T, Fut>
166        where
167            Fut: Future<Output = Result<U, CommandError>> + Send + 'static,
168            T: DeserializeOwned + 'static,
169            U: Serialize,
170        {
171            fn name(&self) -> Name {
172                self.name.clone()
173            }
174
175            fn instruction_info(&self) -> Option<InstructionInfo> {
176                self.instruction_info.clone()
177            }
178
179            fn inputs(&self) -> Vec<crate::CmdInputDescription> {
180                self.inputs.clone()
181            }
182
183            fn outputs(&self) -> Vec<crate::CmdOutputDescription> {
184                self.outputs.clone()
185            }
186
187            fn run<'a: 'b, 'b>(
188                &'a self,
189                ctx: CommandContext,
190                params: crate::ValueSet,
191            ) -> LocalBoxFuture<'b, Result<crate::ValueSet, CommandError>> {
192                match value::from_map(params) {
193                    Ok(input) => {
194                        let fut = (self.run)(ctx, input);
195                        Box::pin(async move { Ok(value::to_map(&fut.await?)?) })
196                    }
197                    Err(error) => Box::pin(async move { Err(error.into()) }),
198                }
199            }
200
201            fn permissions(&self) -> Permissions {
202                self.permissions.clone()
203            }
204        }
205
206        let mut cmd = Command {
207            name: self.def.data.node_id.clone(),
208            run: Box::new(f),
209            inputs: self
210                .def
211                .targets
212                .into_iter()
213                .map(|x| crate::CmdInputDescription {
214                    name: x.name,
215                    type_bounds: x.type_bounds,
216                    required: x.required,
217                    passthrough: x.passthrough,
218                })
219                .collect(),
220            outputs: self
221                .def
222                .sources
223                .into_iter()
224                .map(|x| crate::CmdOutputDescription {
225                    name: x.name,
226                    r#type: x.r#type,
227                    optional: x.optional,
228                })
229                .collect(),
230            instruction_info: self.def.data.instruction_info,
231            permissions: self.def.permissions,
232        };
233
234        if let Some(name) = self.signature_name {
235            cmd.instruction_info = Some(InstructionInfo::simple(&cmd, &name))
236        }
237
238        Box::new(cmd)
239    }
240}