lutra_runner/lib.rs
1#![cfg_attr(not(feature = "std"), no_std)]
2
3pub mod binary;
4
5#[cfg(feature = "channel")]
6pub mod channel;
7
8#[cfg(feature = "sync")]
9mod sync;
10
11#[cfg(feature = "async")]
12mod r#async;
13
14#[cfg(feature = "sync")]
15pub use sync::SyncRunner;
16
17#[cfg(feature = "async")]
18pub use r#async::AsyncRunner;
19
20use lutra_bin::{rr, string, vec};
21
22// Runner Posix Interface:
23// - an executable that can read arguments and env variables,
24// - Runner RPC over stdin+stdout,
25
26// Runner Binary Interface:
27// - requires duplex stream,
28// - passes messages encoded as lutra-bin, see messages.lt
29
30// callers will be able to invoke
31// - types implementing 'Runner Rust Interface' trait,
32// - Binary streams, via `runner::binary::Client` wrapper,
33// - Posix executables, via `runner::posix::Client` wrapper,
34// - HTTP servers, via `runner::http::Client`,
35
36// lutra-runner-postgres will be accessible:
37// - via using `Runner` trait,
38// - through `runner::binary::Server`,
39// - through `runner::posix::Server`,
40// - over HTTP, via `runner::http::Server`,
41
42/// Ability to execute a lutra program.
43pub trait Run {
44 type Error: core::fmt::Debug;
45 type Prepared;
46
47 /// Run a program.
48 ///
49 /// This is helper function for [Run::prepare] followed by [Run::execute],
50 /// with input encoding and output decoding.
51 fn run<I, O>(
52 &self,
53 program: &rr::TypedProgram<I, O>,
54 input: &I,
55 ) -> impl Future<Output = Result<lutra_bin::Result<O>, Self::Error>>
56 where
57 I: lutra_bin::Encode,
58 O: lutra_bin::Decode,
59 {
60 async {
61 let input = input.encode();
62 let handle = self.prepare(program.inner.clone()).await?;
63 let output = self.execute(&handle, &input).await?;
64 Ok(O::decode(&output))
65 }
66 }
67
68 /// Prepares a program for execution and returns a handle, which can be
69 /// used with [Run::execute].
70 ///
71 /// If the program is invalid, error is returned either now or later by [Run::execute].
72 ///
73 /// When the handle is returned, the program might not be
74 /// fully prepared yet, so first execution of the program
75 /// might take longer then subsequent [Run::execute] calls.
76 fn prepare(
77 &self,
78 program: rr::Program,
79 ) -> impl Future<Output = Result<Self::Prepared, Self::Error>>;
80
81 /// Execute a prepared program.
82 /// Program's format must match the format supported by this runner.
83 fn execute(
84 &self,
85 program: &Self::Prepared,
86 input: &[u8],
87 ) -> impl Future<Output = Result<vec::Vec<u8>, Self::Error>>;
88
89 /// Return static interface of this runner as Lutra source code.
90 ///
91 /// Runners can provide implementations for functions that are not part of
92 /// standard library. This function returns definitions of these functions as
93 /// Lutra source code.
94 ///
95 /// For example: interpreter can provide `fs::read_parquet()`
96 /// and PostgreSQL runner can provide `sql::read_table()`.
97 fn get_interface(&self) -> impl Future<Output = Result<string::String, Self::Error>> {
98 async { Ok(string::String::new()) }
99 }
100
101 /// Releases any claimed resources or network connections.
102 fn shutdown(&self) -> impl Future<Output = Result<(), Self::Error>> {
103 async { Ok(()) }
104 }
105}
106
107/// Synchronous version of the Run trait for runners that don't block the process.
108///
109/// This trait should only be implemented for runners that:
110/// - Execute entirely in-memory (like the interpreter)
111/// - Use internal thread pools for blocking operations (like DuckDB)
112///
113/// Do NOT implement this for runners that perform actual async I/O operations
114/// (like network database connections).
115///
116/// Methods take `&mut self` since synchronous operations may need to mutate
117/// internal state (like database connections).
118pub trait RunSync {
119 type Error: core::fmt::Debug;
120 type Prepared;
121
122 /// Run a program.
123 ///
124 /// This is helper function for [RunSync::prepare_sync] followed by [RunSync::execute_sync],
125 /// with input encoding and output decoding.
126 fn run_sync<I, O>(
127 &mut self,
128 program: &rr::TypedProgram<I, O>,
129 input: &I,
130 ) -> Result<lutra_bin::Result<O>, Self::Error>
131 where
132 I: lutra_bin::Encode,
133 O: lutra_bin::Decode,
134 {
135 let input = input.encode();
136 let handle = self.prepare_sync(program.inner.clone())?;
137 let output = self.execute_sync(&handle, &input)?;
138 Ok(O::decode(&output))
139 }
140
141 /// Prepares a program for execution and returns a handle, which can be
142 /// used with [RunSync::execute_sync].
143 ///
144 /// If the program is invalid, error is returned either now or later by [RunSync::execute_sync].
145 ///
146 /// When the handle is returned, the program might not be
147 /// fully prepared yet, so first execution of the program
148 /// might take longer then subsequent [RunSync::execute_sync] calls.
149 fn prepare_sync(&mut self, program: rr::Program) -> Result<Self::Prepared, Self::Error>;
150
151 /// Execute a prepared program synchronously.
152 /// Program's format must match the format supported by this runner.
153 fn execute_sync(
154 &mut self,
155 program: &Self::Prepared,
156 input: &[u8],
157 ) -> Result<vec::Vec<u8>, Self::Error>;
158
159 /// Return static interface of this runner as Lutra source code.
160 ///
161 /// Runners can provide implementations for functions that are not part of
162 /// standard library. This function returns definitions of these functions as
163 /// Lutra source code.
164 ///
165 /// For example: interpreter can provide `fs::read_parquet()`
166 /// and DuckDB runner can provide `sql::read_table()`.
167 fn get_interface_sync(&mut self) -> Result<string::String, Self::Error> {
168 Ok(string::String::new())
169 }
170
171 /// Releases any claimed resources.
172 fn shutdown_sync(&mut self) -> Result<(), Self::Error> {
173 Ok(())
174 }
175}
176
177/// Standard error codes used in runner protocol
178pub mod error_codes {
179 pub const DECODE_ERROR: &str = "DECODE_ERROR";
180 pub const PROGRAM_NOT_FOUND: &str = "PROGRAM_NOT_FOUND";
181 pub const EXECUTION_ERROR: &str = "EXECUTION_ERROR";
182 pub const PREPARE_ERROR: &str = "PREPARE_ERROR";
183}