flow_lib/command/
mod.rs

1//! [`CommandTrait`] and command [`builder`].
2//!
3//! To make a new [`native`][crate::config::CommandType::Native] command:
4//! 1. Implement [`CommandTrait`], 2 ways;
5//!     - Manually implement it to your types.
6//!     - Use [`builder`] helper.
7//! 2. Use [`inventory::submit`] with a [`CommandDescription`] to register the command at compile-time.
8
9use crate::{
10    CommandType, ValueType,
11    config::{
12        CmdInputDescription, CmdOutputDescription, Name, ValueSet,
13        client::{self, NodeData},
14        node::Permissions,
15    },
16    context::CommandContext,
17};
18use futures::future::{Either, LocalBoxFuture, OptionFuture};
19use regex::Regex;
20use serde::{Deserialize, Serialize};
21use std::{borrow::Cow, collections::BTreeMap, future::ready};
22use uuid::Uuid;
23use value::Value;
24
25pub mod builder;
26
27/// Import common types for writing commands.
28pub mod prelude {
29    pub use crate::{
30        CmdInputDescription, CmdInputDescription as Input, CmdOutputDescription,
31        CmdOutputDescription as Output, FlowId, Name, ValueSet, ValueType,
32        command::{
33            CommandDescription, CommandError, CommandTrait, InstructionInfo,
34            builder::{BuildResult, BuilderCache, BuilderError, CmdBuilder},
35        },
36        config::{client::NodeData, node::Permissions},
37        context::CommandContext,
38        solana::Instructions,
39    };
40    pub use async_trait::async_trait;
41    pub use bytes::Bytes;
42    pub use futures::future::Either;
43    pub use serde::{Deserialize, Serialize};
44    pub use serde_json::Value as JsonValue;
45    pub use serde_with::serde_as;
46    pub use solana_keypair::Keypair;
47    pub use solana_pubkey::Pubkey;
48    pub use solana_signature::Signature;
49    pub use thiserror::Error as ThisError;
50    pub use value::{
51        self, Decimal, Value,
52        with::{AsDecimal, AsKeypair, AsPubkey, AsSignature},
53    };
54}
55
56/// Error type of commmands.
57pub type CommandError = anyhow::Error;
58
59/// Generic trait for implementing commands.
60#[async_trait::async_trait(?Send)]
61pub trait CommandTrait: 'static {
62    fn r#type(&self) -> CommandType {
63        CommandType::Native
64    }
65
66    /// Unique name to identify the command.
67    fn name(&self) -> Name;
68
69    /// List of inputs that the command can receive.
70    fn inputs(&self) -> Vec<CmdInputDescription>;
71
72    /// List of outputs that the command will return.
73    fn outputs(&self) -> Vec<CmdOutputDescription>;
74
75    /// Run the command.
76    async fn run(&self, ctx: CommandContext, params: ValueSet) -> Result<ValueSet, CommandError>;
77
78    /// Specify if and how would this command output Solana instructions.
79    fn instruction_info(&self) -> Option<InstructionInfo> {
80        None
81    }
82
83    /// Specify requested permissions of this command.
84    fn permissions(&self) -> Permissions {
85        Permissions::default()
86    }
87
88    /// Async `Drop` method.
89    async fn destroy(&mut self) {}
90
91    /// Specify how [`form_data`][crate::config::NodeConfig::form_data] are read.
92    fn read_form_data(&self, data: serde_json::Value) -> ValueSet {
93        let mut result = ValueSet::new();
94        for i in self.inputs() {
95            if let Some(json) = data.get(&i.name) {
96                let value = Value::from(json.clone());
97                result.insert(i.name.clone(), value);
98            }
99        }
100        result
101    }
102
103    fn node_data(&self) -> NodeData {
104        default_node_data(self)
105    }
106}
107
108/// Specify how to convert inputs into passthrough outputs.
109pub fn passthrough_outputs<T: CommandTrait + ?Sized>(cmd: &T, inputs: &ValueSet) -> ValueSet {
110    let mut res = ValueSet::new();
111    for i in cmd.inputs() {
112        if i.passthrough
113            && let Some(value) = inputs.get(&i.name)
114        {
115            if !i.required && matches!(value, Value::Null) {
116                continue;
117            }
118
119            let value = match i.type_bounds.first() {
120                Some(ValueType::Pubkey) => {
121                    // keypair could be automatically converted into pubkey
122                    // we don't want to passthrough the keypair here, only pubkey
123                    value::pubkey::deserialize(value.clone()).map(Into::into)
124                }
125                _ => Ok(value.clone()),
126            }
127            .unwrap_or_else(|error| {
128                tracing::warn!("error reading passthrough: {}", error);
129                value.clone()
130            });
131            res.insert(i.name, value);
132        }
133    }
134    res
135}
136
137/// Specify how [`form_data`][crate::config::NodeConfig::form_data] are read.
138pub fn default_read_form_data<T: CommandTrait + ?Sized>(
139    cmd: &T,
140    data: serde_json::Value,
141) -> ValueSet {
142    let mut result = ValueSet::new();
143    for i in cmd.inputs() {
144        if let Some(json) = data.get(&i.name) {
145            let value = Value::from(json.clone());
146            result.insert(i.name.clone(), value);
147        }
148    }
149    result
150}
151
152pub fn default_node_data<T: CommandTrait + ?Sized>(cmd: &T) -> NodeData {
153    NodeData {
154        r#type: cmd.r#type(),
155        node_id: cmd.name(),
156        sources: cmd
157            .outputs()
158            .into_iter()
159            .map(|output| client::Source {
160                id: Uuid::nil(),
161                name: output.name,
162                r#type: output.r#type,
163                optional: output.optional,
164            })
165            .collect(),
166        targets: cmd
167            .inputs()
168            .into_iter()
169            .map(|input| client::Target {
170                id: Uuid::nil(),
171                name: input.name,
172                type_bounds: input.type_bounds,
173                required: input.required,
174                passthrough: input.passthrough,
175            })
176            .collect(),
177        targets_form: client::TargetsForm::default(),
178        instruction_info: cmd.instruction_info(),
179    }
180}
181
182pub fn input_is_required<T: CommandTrait + ?Sized>(cmd: &T, name: &str) -> Option<bool> {
183    cmd.inputs()
184        .into_iter()
185        .find_map(|i| (i.name == name).then_some(i.required))
186}
187
188pub fn output_is_optional<T: CommandTrait + ?Sized>(cmd: &T, name: &str) -> Option<bool> {
189    cmd.outputs()
190        .into_iter()
191        .find_map(|o| (o.name == name).then_some(o.optional))
192        .or_else(|| {
193            cmd.inputs()
194                .into_iter()
195                .find_map(|i| (i.name == name && i.passthrough).then_some(!i.required))
196        })
197}
198
199/// Specify the order with which a command will return its output:
200/// - [`before`][InstructionInfo::before]: list of output names returned before instructions are sent.
201/// - [`signature`][InstructionInfo::signature]: name of the signature output port.
202/// - [`after`][InstructionInfo::after]: list of output names returned after instructions are sent.
203#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204pub struct InstructionInfo {
205    pub before: Vec<Name>,
206    pub signature: Name,
207    pub after: Vec<Name>,
208}
209
210impl InstructionInfo {
211    /// Simple `InstructionInfo` that can describe most commands:
212    /// - [`before`][InstructionInfo::before]: All passthroughs and outputs, except for `signature`.
213    /// - [`after`][InstructionInfo::after]: empty.
214    pub fn simple<C: CommandTrait>(cmd: &C, signature: &str) -> Self {
215        let before = cmd
216            .inputs()
217            .into_iter()
218            .filter(|i| i.passthrough)
219            .map(|i| i.name)
220            .chain(
221                cmd.outputs()
222                    .into_iter()
223                    .filter(|o| o.name != signature)
224                    .map(|o| o.name),
225            )
226            .collect();
227        Self {
228            before,
229            after: Vec::new(),
230            signature: signature.into(),
231        }
232    }
233}
234
235#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, bincode::Encode)]
236pub enum MatchName {
237    Exact(Cow<'static, str>),
238    Regex(Cow<'static, str>),
239}
240
241// generated by bincode macro
242impl<__Context> ::bincode::Decode<__Context> for MatchName {
243    fn decode<__D: ::bincode::de::Decoder<Context = __Context>>(
244        decoder: &mut __D,
245    ) -> core::result::Result<Self, ::bincode::error::DecodeError> {
246        let variant_index = <u32 as ::bincode::Decode<__D::Context>>::decode(decoder)?;
247        match variant_index {
248            0u32 => core::result::Result::Ok(Self::Exact(
249                ::bincode::Decode::<__D::Context>::decode(decoder)?,
250            )),
251            1u32 => core::result::Result::Ok(Self::Regex(
252                ::bincode::Decode::<__D::Context>::decode(decoder)?,
253            )),
254            variant => {
255                core::result::Result::Err(::bincode::error::DecodeError::UnexpectedVariant {
256                    found: variant,
257                    type_name: "MatchName",
258                    allowed: &::bincode::error::AllowedEnumVariants::Range { min: 0, max: 1 },
259                })
260            }
261        }
262    }
263}
264
265impl<'de, C> bincode::BorrowDecode<'de, C> for MatchName {
266    fn borrow_decode<D: bincode::de::BorrowDecoder<'de, Context = C>>(
267        decoder: &mut D,
268    ) -> Result<Self, bincode::error::DecodeError> {
269        bincode::Decode::decode(decoder)
270    }
271}
272
273impl std::fmt::Debug for MatchName {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        match self {
276            Self::Exact(arg0) => arg0.fmt(f),
277            Self::Regex(arg0) => {
278                f.write_str("/")?;
279                f.write_str(arg0)?;
280                f.write_str("/")
281            }
282        }
283    }
284}
285
286impl std::fmt::Display for MatchName {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        match self {
289            MatchName::Exact(cow) => cow.fmt(f),
290            MatchName::Regex(cow) => cow.fmt(f),
291        }
292    }
293}
294
295#[derive(Clone, bincode::Encode, bincode::Decode, PartialEq, Eq, PartialOrd, Ord)]
296pub struct MatchCommand {
297    pub r#type: CommandType,
298    pub name: MatchName,
299}
300
301impl std::fmt::Debug for MatchCommand {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        self.r#type.fmt(f)?;
304        f.write_str(":")?;
305        self.name.fmt(f)
306    }
307}
308
309impl MatchCommand {
310    pub fn is_match(&self, ty: CommandType, name: &str) -> bool {
311        self.r#type == ty
312            && match &self.name {
313                MatchName::Exact(cow) => cow == name,
314                MatchName::Regex(cow) => Regex::new(cow) // TODO: slow
315                    .map(|re| re.is_match(name))
316                    .ok()
317                    .unwrap_or(false),
318            }
319    }
320}
321
322pub type FnNewResult = Result<Box<dyn CommandTrait>, CommandError>;
323
324/// Use [`inventory::submit`] to register commands at compile-time.
325#[derive(Clone)]
326pub struct CommandDescription {
327    pub matcher: MatchCommand,
328    /// Function to initialize the command from a [`NodeData`].
329    pub fn_new:
330        Either<fn(&NodeData) -> FnNewResult, fn(&NodeData) -> LocalBoxFuture<'static, FnNewResult>>,
331}
332
333impl CommandDescription {
334    pub const fn new(name: &'static str, fn_new: fn(&NodeData) -> FnNewResult) -> Self {
335        Self {
336            matcher: MatchCommand {
337                r#type: CommandType::Native,
338                name: MatchName::Exact(Cow::Borrowed(name)),
339            },
340            fn_new: Either::Left(fn_new),
341        }
342    }
343}
344
345inventory::collect!(CommandDescription);
346
347pub fn collect_commands() -> BTreeMap<&'static MatchCommand, &'static CommandDescription> {
348    inventory::iter::<CommandDescription>()
349        .map(|c| (&c.matcher, c))
350        .collect()
351}
352
353#[derive(Debug, Clone)]
354pub struct CommandIndex<T> {
355    pub exact_match: BTreeMap<(CommandType, Cow<'static, str>), T>,
356    pub regex: Vec<(CommandType, regex::Regex, T)>,
357}
358
359impl<T> Default for CommandIndex<T> {
360    fn default() -> Self {
361        Self {
362            exact_match: <_>::default(),
363            regex: <_>::default(),
364        }
365    }
366}
367
368impl<T> FromIterator<(MatchCommand, T)> for CommandIndex<T> {
369    fn from_iter<I: IntoIterator<Item = (MatchCommand, T)>>(iter: I) -> Self {
370        let mut this = Self::default();
371        for (matcher, t) in iter {
372            match &matcher.name {
373                MatchName::Exact(cow) => {
374                    this.exact_match.insert((matcher.r#type, cow.clone()), t);
375                }
376                MatchName::Regex(cow) => {
377                    this.regex
378                        .push((matcher.r#type, Regex::new(cow).expect("invalid regex"), t));
379                }
380            }
381        }
382        this
383    }
384}
385
386impl<T> CommandIndex<T> {
387    pub fn get(&self, ty: CommandType, name: &str) -> Option<&T> {
388        if let Some(d) = self.exact_match.get(&(ty, name.to_owned().into())) {
389            Some(d)
390        } else {
391            let mut matched = None;
392            for r in &self.regex {
393                if r.0 == ty && r.1.is_match(name) {
394                    matched = Some(&r.2);
395                }
396            }
397            matched
398        }
399    }
400
401    pub fn availables(&self) -> impl Iterator<Item = MatchCommand> {
402        self.exact_match
403            .keys()
404            .cloned()
405            .map(|(r#type, name)| MatchCommand {
406                r#type,
407                name: MatchName::Exact(name),
408            })
409            .chain(self.regex.iter().map(|(ty, regex, _)| MatchCommand {
410                r#type: *ty,
411                name: MatchName::Regex(regex.to_string().into()),
412            }))
413    }
414}
415
416#[derive(Clone)]
417pub struct CommandFactory {
418    index: CommandIndex<&'static CommandDescription>,
419}
420
421impl CommandFactory {
422    pub fn collect() -> Self {
423        Self {
424            index: inventory::iter::<CommandDescription>()
425                .map(|c| (c.matcher.clone(), c))
426                .collect(),
427        }
428    }
429
430    pub fn init(
431        &self,
432        nd: &NodeData,
433    ) -> impl Future<Output = Result<Option<Box<dyn CommandTrait>>, CommandError>> + 'static {
434        let cmd = self.index.get(nd.r#type, &nd.node_id);
435
436        let either = cmd.map(|cmd| match cmd.fn_new {
437            Either::Left(fn_new) => Either::Left(ready(fn_new(nd))),
438            Either::Right(async_fn_new) => Either::Right(async_fn_new(nd)),
439        });
440        async move { OptionFuture::from(either).await.transpose() }
441    }
442
443    pub fn availables(&self) -> impl Iterator<Item = MatchCommand> {
444        self.index.availables()
445    }
446}