Skip to main content

lutra_runner/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3pub mod binary;
4
5#[cfg(feature = "channel")]
6pub mod channel;
7
8#[cfg(feature = "sync")]
9mod sync;
10
11#[cfg(feature = "async")]
12mod r#async;
13
14#[cfg(feature = "sync")]
15pub use sync::SyncRunner;
16
17#[cfg(feature = "async")]
18pub use r#async::AsyncRunner;
19
20use lutra_bin::{rr, string, vec};
21
22// Runner Posix Interface:
23// - an executable that can read arguments and env variables,
24// - Runner RPC over stdin+stdout,
25
26// Runner Binary Interface:
27// - requires duplex stream,
28// - passes messages encoded as lutra-bin, see messages.lt
29
30// callers will be able to invoke
31// - types implementing 'Runner Rust Interface' trait,
32// - Binary streams, via `runner::binary::Client` wrapper,
33// - Posix executables, via `runner::posix::Client` wrapper,
34// - HTTP servers, via `runner::http::Client`,
35
36// lutra-runner-postgres will be accessible:
37// - via using `Runner` trait,
38// - through `runner::binary::Server`,
39// - through `runner::posix::Server`,
40// - over HTTP, via `runner::http::Server`,
41
42/// Ability to execute a lutra program.
43pub trait Run {
44    type Error: core::fmt::Debug;
45    type Prepared;
46
47    /// Run a program.
48    ///
49    /// This is helper function for [Run::prepare] followed by [Run::execute],
50    /// with input encoding and output decoding.
51    fn run<I, O>(
52        &self,
53        program: &rr::TypedProgram<I, O>,
54        input: &I,
55    ) -> impl Future<Output = Result<lutra_bin::Result<O>, Self::Error>>
56    where
57        I: lutra_bin::Encode,
58        O: lutra_bin::Decode,
59    {
60        async {
61            let input = input.encode();
62            let handle = self.prepare(program.inner.clone()).await?;
63            let output = self.execute(&handle, &input).await?;
64            Ok(O::decode(&output))
65        }
66    }
67
68    /// Prepares a program for execution and returns a handle, which can be
69    /// used with [Run::execute].
70    ///
71    /// If the program is invalid, error is returned either now or later by [Run::execute].
72    ///
73    /// When the handle is returned, the program might not be
74    /// fully prepared yet, so first execution of the program
75    /// might take longer then subsequent [Run::execute] calls.
76    fn prepare(
77        &self,
78        program: rr::Program,
79    ) -> impl Future<Output = Result<Self::Prepared, Self::Error>>;
80
81    /// Execute a prepared program.
82    /// Program's format must match the format supported by this runner.
83    fn execute(
84        &self,
85        program: &Self::Prepared,
86        input: &[u8],
87    ) -> impl Future<Output = Result<vec::Vec<u8>, Self::Error>>;
88
89    /// Return static interface of this runner as Lutra source code.
90    ///
91    /// Runners can provide implementations for functions that are not part of
92    /// standard library. This function returns definitions of these functions as
93    /// Lutra source code.
94    ///
95    /// For example: interpreter can provide `fs::read_parquet()`
96    /// and PostgreSQL runner can provide `sql::read_table()`.
97    fn get_interface(&self) -> impl Future<Output = Result<string::String, Self::Error>> {
98        async { Ok(string::String::new()) }
99    }
100
101    /// Releases any claimed resources or network connections.
102    fn shutdown(&self) -> impl Future<Output = Result<(), Self::Error>> {
103        async { Ok(()) }
104    }
105}
106
107/// Synchronous version of the Run trait for runners that don't block the process.
108///
109/// This trait should only be implemented for runners that:
110/// - Execute entirely in-memory (like the interpreter)
111/// - Use internal thread pools for blocking operations (like DuckDB)
112///
113/// Do NOT implement this for runners that perform actual async I/O operations
114/// (like network database connections).
115///
116/// Methods take `&mut self` since synchronous operations may need to mutate
117/// internal state (like database connections).
118pub trait RunSync {
119    type Error: core::fmt::Debug;
120    type Prepared;
121
122    /// Run a program.
123    ///
124    /// This is helper function for [RunSync::prepare_sync] followed by [RunSync::execute_sync],
125    /// with input encoding and output decoding.
126    fn run_sync<I, O>(
127        &mut self,
128        program: &rr::TypedProgram<I, O>,
129        input: &I,
130    ) -> Result<lutra_bin::Result<O>, Self::Error>
131    where
132        I: lutra_bin::Encode,
133        O: lutra_bin::Decode,
134    {
135        let input = input.encode();
136        let handle = self.prepare_sync(program.inner.clone())?;
137        let output = self.execute_sync(&handle, &input)?;
138        Ok(O::decode(&output))
139    }
140
141    /// Prepares a program for execution and returns a handle, which can be
142    /// used with [RunSync::execute_sync].
143    ///
144    /// If the program is invalid, error is returned either now or later by [RunSync::execute_sync].
145    ///
146    /// When the handle is returned, the program might not be
147    /// fully prepared yet, so first execution of the program
148    /// might take longer then subsequent [RunSync::execute_sync] calls.
149    fn prepare_sync(&mut self, program: rr::Program) -> Result<Self::Prepared, Self::Error>;
150
151    /// Execute a prepared program synchronously.
152    /// Program's format must match the format supported by this runner.
153    fn execute_sync(
154        &mut self,
155        program: &Self::Prepared,
156        input: &[u8],
157    ) -> Result<vec::Vec<u8>, Self::Error>;
158
159    /// Return static interface of this runner as Lutra source code.
160    ///
161    /// Runners can provide implementations for functions that are not part of
162    /// standard library. This function returns definitions of these functions as
163    /// Lutra source code.
164    ///
165    /// For example: interpreter can provide `fs::read_parquet()`
166    /// and DuckDB runner can provide `sql::read_table()`.
167    fn get_interface_sync(&mut self) -> Result<string::String, Self::Error> {
168        Ok(string::String::new())
169    }
170
171    /// Releases any claimed resources.
172    fn shutdown_sync(&mut self) -> Result<(), Self::Error> {
173        Ok(())
174    }
175}
176
177/// Standard error codes used in runner protocol
178pub mod error_codes {
179    pub const DECODE_ERROR: &str = "DECODE_ERROR";
180    pub const PROGRAM_NOT_FOUND: &str = "PROGRAM_NOT_FOUND";
181    pub const EXECUTION_ERROR: &str = "EXECUTION_ERROR";
182    pub const PREPARE_ERROR: &str = "PREPARE_ERROR";
183}