Skip to main content

alembic_engine/
external.rs

1//! helpers for implementing external adapters.
2
3use crate::{ApplyReport, BackendId, Op, ProvisionReport, StateData};
4use alembic_core::{JsonMap, Key, Schema, TypeName};
5use anyhow::Result;
6use serde::{Deserialize, Serialize};
7use std::io::{self, BufReader, Read, Write};
8
9/// current external adapter protocol version.
10pub const EXTERNAL_PROTOCOL_VERSION: u8 = 1;
11
12/// request envelope sent to external adapters.
13#[derive(Debug, Serialize, Deserialize)]
14pub struct ExternalEnvelope {
15    /// protocol version.
16    pub version: u8,
17    /// custom plugin configuration.
18    pub setup: serde_yaml::Value,
19    /// request payload.
20    #[serde(flatten)]
21    pub request: ExternalRequest,
22}
23
24/// external adapter request variants.
25#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "method", rename_all = "snake_case")]
27pub enum ExternalRequest {
28    /// read inventory for the requested types.
29    Read {
30        schema: Schema,
31        types: Vec<TypeName>,
32        state: StateData,
33    },
34    /// apply a set of operations.
35    Write {
36        schema: Schema,
37        ops: Vec<Op>,
38        state: StateData,
39    },
40    /// ensure the backend schema exists.
41    EnsureSchema { schema: Schema },
42}
43
44/// observed object representation for external adapters.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ExternalObject {
47    /// object type.
48    pub type_name: TypeName,
49    /// natural key for matching.
50    pub key: Key,
51    /// observed attributes.
52    pub attrs: JsonMap,
53    /// backend id when known.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub backend_id: Option<BackendId>,
56}
57
58/// response wrapper for external adapters.
59#[derive(Debug, Serialize, Deserialize)]
60pub struct ExternalResponse<T> {
61    /// whether the request succeeded.
62    pub ok: bool,
63    /// payload on success.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub result: Option<T>,
66    /// error message on failure.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub error: Option<String>,
69}
70
71impl<T> ExternalResponse<T> {
72    /// build a success response.
73    pub fn ok(result: T) -> Self {
74        Self {
75            ok: true,
76            result: Some(result),
77            error: None,
78        }
79    }
80
81    /// build an error response.
82    pub fn error(message: impl Into<String>) -> Self {
83        Self {
84            ok: false,
85            result: None,
86            error: Some(message.into()),
87        }
88    }
89
90    /// convert a result into a response.
91    pub fn from_result(result: Result<T>) -> Self {
92        match result {
93            Ok(value) => Self::ok(value),
94            Err(err) => Self::error(err.to_string()),
95        }
96    }
97}
98
99/// external adapter helper trait.
100pub trait ExternalAdapter {
101    /// initial configuration of the adapter
102    fn setup(&mut self, configuration: &serde_yaml::Value) -> Result<()>;
103
104    /// read objects from the backend.
105    fn read(
106        &mut self,
107        schema: &Schema,
108        types: &[TypeName],
109        state: &StateData,
110    ) -> Result<Vec<ExternalObject>>;
111
112    /// apply operations to the backend.
113    fn write(&mut self, schema: &Schema, ops: &[Op], state: &StateData) -> Result<ApplyReport>;
114
115    /// provision backend schema elements.
116    fn ensure_schema(&mut self, schema: &Schema) -> Result<ProvisionReport> {
117        let _ = schema;
118        Ok(ProvisionReport::default())
119    }
120}
121
122/// run an external adapter using stdin/stdout for a single request.
123pub fn run_external_adapter<A: ExternalAdapter>(
124    mut adapter: A,
125    (reader, mut writer): (impl Read, impl Write),
126) -> io::Result<()> {
127    let mut input = String::new();
128    BufReader::new(reader).read_to_string(&mut input)?;
129
130    let envelope: ExternalEnvelope = match serde_json::from_str(&input) {
131        Ok(envelope) => envelope,
132        Err(err) => return write_error(&mut writer, format!("invalid request: {err}")),
133    };
134
135    if envelope.version != EXTERNAL_PROTOCOL_VERSION {
136        return write_error(
137            &mut writer,
138            format!(
139                "unsupported protocol version {} (expected {})",
140                envelope.version, EXTERNAL_PROTOCOL_VERSION
141            ),
142        );
143    }
144
145    if let Err(e) = adapter.setup(&envelope.setup) {
146        return write_error(&mut writer, format!("invalid setup: {e}"));
147    }
148
149    match envelope.request {
150        ExternalRequest::Read {
151            schema,
152            types,
153            state,
154        } => {
155            let response = ExternalResponse::from_result(adapter.read(&schema, &types, &state));
156            write_response(&mut writer, response)
157        }
158        ExternalRequest::Write { schema, ops, state } => {
159            let response = ExternalResponse::from_result(adapter.write(&schema, &ops, &state));
160            write_response(&mut writer, response)
161        }
162        ExternalRequest::EnsureSchema { schema } => {
163            let response = ExternalResponse::from_result(adapter.ensure_schema(&schema));
164            write_response(&mut writer, response)
165        }
166    }
167}
168
169fn write_error(out: &mut impl Write, message: String) -> io::Result<()> {
170    let response = ExternalResponse::<serde_json::Value>::error(message);
171    write_response(out, response)
172}
173
174fn write_response<T: Serialize>(
175    out: &mut impl Write,
176    response: ExternalResponse<T>,
177) -> io::Result<()> {
178    serde_json::to_writer(&mut *out, &response).map_err(io::Error::other)?;
179    out.write_all(b"\n")?;
180    out.flush()
181}
182
183/// convenience macro to define an external adapter main.
184#[macro_export]
185macro_rules! alembic_external_main {
186    ($adapter:expr) => {
187        fn main() -> std::io::Result<()> {
188            let stdin = std::io::stdin();
189            let mut stdout = std::io::BufWriter::new(std::io::stdout());
190            $crate::external::run_external_adapter($adapter, (stdin, stdout))
191        }
192    };
193}
194
195#[cfg(test)]
196mod tests {
197    use super::ExternalResponse;
198    use crate::{
199        run_external_adapter, ApplyReport, ExternalAdapter, ExternalEnvelope, ExternalObject,
200        ExternalRequest, Op, ProvisionReport, StateData, EXTERNAL_PROTOCOL_VERSION,
201    };
202    use alembic_core::{Schema, TypeName, TypeSchema};
203    use serde_json::json;
204    use serde_yaml::Value;
205    use std::io::BufReader;
206    use std::io::{BufRead, Write};
207
208    #[test]
209    fn external_response_ok_serializes() {
210        let response = ExternalResponse::ok(vec!["one".to_string()]);
211        let value = serde_json::to_value(&response).unwrap();
212        assert_eq!(value, json!({"ok": true, "result": ["one"]}));
213    }
214
215    #[test]
216    fn external_response_error_serializes() {
217        let response: ExternalResponse<Vec<String>> = ExternalResponse::error("boom");
218        let value = serde_json::to_value(&response).unwrap();
219        assert_eq!(value, json!({"ok": false, "error": "boom"}));
220    }
221
222    #[derive(Debug, Default)]
223    struct TestExternalAdapter {
224        pub x: i64,
225    }
226
227    impl ExternalAdapter for TestExternalAdapter {
228        fn setup(&mut self, configuration: &Value) -> anyhow::Result<()> {
229            if let Some(x) = configuration.get("x").and_then(serde_yaml::Value::as_i64) {
230                self.x = x;
231            }
232            Ok(())
233        }
234
235        fn read(
236            &mut self,
237            _schema: &Schema,
238            _types: &[TypeName],
239            _state: &StateData,
240        ) -> anyhow::Result<Vec<ExternalObject>> {
241            let mut result = vec![];
242            for _ in 0..self.x {
243                result.push(ExternalObject {
244                    type_name: TypeName::new(""),
245                    key: Default::default(),
246                    attrs: Default::default(),
247                    backend_id: None,
248                })
249            }
250            Ok(result)
251        }
252
253        fn write(
254            &mut self,
255            _schema: &Schema,
256            _ops: &[Op],
257            _state: &StateData,
258        ) -> anyhow::Result<ApplyReport> {
259            Err(anyhow::anyhow!("unsupported operation"))
260        }
261
262        fn ensure_schema(&mut self, schema: &Schema) -> anyhow::Result<ProvisionReport> {
263            let mut created_fields = vec![];
264            for ty_name in schema.types.keys() {
265                created_fields.push(ty_name.clone());
266            }
267            Ok(ProvisionReport {
268                created_fields,
269                ..Default::default()
270            })
271        }
272    }
273
274    #[test]
275    fn external_adapter_communication_over_stdio() {
276        let adapter = TestExternalAdapter::default();
277
278        let (in_reader, mut in_writer) = std::io::pipe().unwrap();
279        let (out_reader, out_writer) = std::io::pipe().unwrap();
280
281        let t = std::thread::spawn(move || {
282            assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
283        });
284
285        let dummy_type_schema = TypeSchema {
286            key: [].into(),
287            fields: [].into(),
288        };
289
290        let request = ExternalRequest::EnsureSchema {
291            schema: Schema {
292                types: [
293                    ("a".to_string(), dummy_type_schema.clone()),
294                    ("b".to_string(), dummy_type_schema.clone()),
295                ]
296                .into(),
297            },
298        };
299        let envelope = ExternalEnvelope {
300            version: EXTERNAL_PROTOCOL_VERSION,
301            setup: Default::default(),
302            request,
303        };
304
305        writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
306        drop(in_writer);
307
308        let mut response = String::new();
309        BufReader::new(out_reader).read_line(&mut response).unwrap();
310
311        let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
312        assert!(response.ok);
313        assert_eq!(
314            response.result.unwrap().created_fields,
315            vec!["a".to_string(), "b".to_string()]
316        );
317
318        t.join().unwrap();
319    }
320
321    #[test]
322    fn external_adapter_communication_error() {
323        let adapter = TestExternalAdapter::default();
324
325        let (in_reader, mut in_writer) = std::io::pipe().unwrap();
326        let (out_reader, out_writer) = std::io::pipe().unwrap();
327
328        let t = std::thread::spawn(move || {
329            assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
330        });
331
332        // The 'Write' request is booby trapped on TestExternalAdapter
333        let request = ExternalRequest::Write {
334            schema: Default::default(),
335            ops: vec![],
336            state: Default::default(),
337        };
338        let envelope = ExternalEnvelope {
339            version: EXTERNAL_PROTOCOL_VERSION,
340            setup: Default::default(),
341            request,
342        };
343
344        writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
345        drop(in_writer);
346
347        let mut response = String::new();
348        BufReader::new(out_reader).read_line(&mut response).unwrap();
349
350        let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
351        assert!(response.error.is_some());
352        assert!(!response.ok);
353
354        t.join().unwrap();
355    }
356
357    #[test]
358    fn external_adapter_outdated() {
359        let adapter = TestExternalAdapter::default();
360
361        let (in_reader, mut in_writer) = std::io::pipe().unwrap();
362        let (out_reader, out_writer) = std::io::pipe().unwrap();
363
364        let t = std::thread::spawn(move || {
365            assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
366        });
367
368        let request = ExternalRequest::EnsureSchema {
369            schema: Default::default(),
370        };
371        let envelope = ExternalEnvelope {
372            version: EXTERNAL_PROTOCOL_VERSION + 1,
373            setup: Default::default(),
374            request,
375        };
376
377        writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
378        drop(in_writer);
379
380        let mut response = String::new();
381        BufReader::new(out_reader).read_line(&mut response).unwrap();
382
383        let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
384        if let Some(error) = response.error {
385            assert_eq!(
386                error,
387                format!(
388                    "unsupported protocol version {} (expected {})",
389                    EXTERNAL_PROTOCOL_VERSION + 1,
390                    EXTERNAL_PROTOCOL_VERSION
391                )
392            );
393        }
394        assert!(!response.ok);
395
396        t.join().unwrap();
397    }
398
399    #[test]
400    fn external_adapter_configuration() {
401        let adapter = TestExternalAdapter::default();
402
403        let (in_reader, mut in_writer) = std::io::pipe().unwrap();
404        let (out_reader, out_writer) = std::io::pipe().unwrap();
405
406        let t = std::thread::spawn(move || {
407            assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
408        });
409
410        let request = ExternalRequest::Read {
411            schema: Default::default(),
412            types: vec![],
413            state: Default::default(),
414        };
415        const MAGIC_NUMBER: usize = 13;
416
417        let envelope = ExternalEnvelope {
418            version: EXTERNAL_PROTOCOL_VERSION,
419            setup: serde_yaml::from_str(&format!("x: {MAGIC_NUMBER}")).unwrap(),
420            request,
421        };
422
423        writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
424        drop(in_writer);
425
426        let mut response = String::new();
427        BufReader::new(out_reader).read_line(&mut response).unwrap();
428
429        let response: ExternalResponse<Vec<ExternalObject>> =
430            serde_json::from_str(&response).unwrap();
431        assert!(response.ok);
432        assert_eq!(response.result.unwrap().len(), MAGIC_NUMBER,);
433
434        t.join().unwrap();
435    }
436}