1use crate::{ApplyReport, BackendId, Op, ProvisionReport, StateData};
4use alembic_core::{JsonMap, Key, Schema, TypeName};
5use anyhow::Result;
6use serde::{Deserialize, Serialize};
7use std::io::{self, BufReader, Read, Write};
8
9pub const EXTERNAL_PROTOCOL_VERSION: u8 = 1;
11
12#[derive(Debug, Serialize, Deserialize)]
14pub struct ExternalEnvelope {
15 pub version: u8,
17 pub setup: serde_yaml::Value,
19 #[serde(flatten)]
21 pub request: ExternalRequest,
22}
23
24#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "method", rename_all = "snake_case")]
27pub enum ExternalRequest {
28 Read {
30 schema: Schema,
31 types: Vec<TypeName>,
32 state: StateData,
33 },
34 Write {
36 schema: Schema,
37 ops: Vec<Op>,
38 state: StateData,
39 },
40 EnsureSchema { schema: Schema },
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ExternalObject {
47 pub type_name: TypeName,
49 pub key: Key,
51 pub attrs: JsonMap,
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub backend_id: Option<BackendId>,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
60pub struct ExternalResponse<T> {
61 pub ok: bool,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub result: Option<T>,
66 #[serde(skip_serializing_if = "Option::is_none")]
68 pub error: Option<String>,
69}
70
71impl<T> ExternalResponse<T> {
72 pub fn ok(result: T) -> Self {
74 Self {
75 ok: true,
76 result: Some(result),
77 error: None,
78 }
79 }
80
81 pub fn error(message: impl Into<String>) -> Self {
83 Self {
84 ok: false,
85 result: None,
86 error: Some(message.into()),
87 }
88 }
89
90 pub fn from_result(result: Result<T>) -> Self {
92 match result {
93 Ok(value) => Self::ok(value),
94 Err(err) => Self::error(err.to_string()),
95 }
96 }
97}
98
99pub trait ExternalAdapter {
101 fn setup(&mut self, configuration: &serde_yaml::Value) -> Result<()>;
103
104 fn read(
106 &mut self,
107 schema: &Schema,
108 types: &[TypeName],
109 state: &StateData,
110 ) -> Result<Vec<ExternalObject>>;
111
112 fn write(&mut self, schema: &Schema, ops: &[Op], state: &StateData) -> Result<ApplyReport>;
114
115 fn ensure_schema(&mut self, schema: &Schema) -> Result<ProvisionReport> {
117 let _ = schema;
118 Ok(ProvisionReport::default())
119 }
120}
121
122pub fn run_external_adapter<A: ExternalAdapter>(
124 mut adapter: A,
125 (reader, mut writer): (impl Read, impl Write),
126) -> io::Result<()> {
127 let mut input = String::new();
128 BufReader::new(reader).read_to_string(&mut input)?;
129
130 let envelope: ExternalEnvelope = match serde_json::from_str(&input) {
131 Ok(envelope) => envelope,
132 Err(err) => return write_error(&mut writer, format!("invalid request: {err}")),
133 };
134
135 if envelope.version != EXTERNAL_PROTOCOL_VERSION {
136 return write_error(
137 &mut writer,
138 format!(
139 "unsupported protocol version {} (expected {})",
140 envelope.version, EXTERNAL_PROTOCOL_VERSION
141 ),
142 );
143 }
144
145 if let Err(e) = adapter.setup(&envelope.setup) {
146 return write_error(&mut writer, format!("invalid setup: {e}"));
147 }
148
149 match envelope.request {
150 ExternalRequest::Read {
151 schema,
152 types,
153 state,
154 } => {
155 let response = ExternalResponse::from_result(adapter.read(&schema, &types, &state));
156 write_response(&mut writer, response)
157 }
158 ExternalRequest::Write { schema, ops, state } => {
159 let response = ExternalResponse::from_result(adapter.write(&schema, &ops, &state));
160 write_response(&mut writer, response)
161 }
162 ExternalRequest::EnsureSchema { schema } => {
163 let response = ExternalResponse::from_result(adapter.ensure_schema(&schema));
164 write_response(&mut writer, response)
165 }
166 }
167}
168
169fn write_error(out: &mut impl Write, message: String) -> io::Result<()> {
170 let response = ExternalResponse::<serde_json::Value>::error(message);
171 write_response(out, response)
172}
173
174fn write_response<T: Serialize>(
175 out: &mut impl Write,
176 response: ExternalResponse<T>,
177) -> io::Result<()> {
178 serde_json::to_writer(&mut *out, &response).map_err(io::Error::other)?;
179 out.write_all(b"\n")?;
180 out.flush()
181}
182
183#[macro_export]
185macro_rules! alembic_external_main {
186 ($adapter:expr) => {
187 fn main() -> std::io::Result<()> {
188 let stdin = std::io::stdin();
189 let mut stdout = std::io::BufWriter::new(std::io::stdout());
190 $crate::external::run_external_adapter($adapter, (stdin, stdout))
191 }
192 };
193}
194
195#[cfg(test)]
196mod tests {
197 use super::ExternalResponse;
198 use crate::{
199 run_external_adapter, ApplyReport, ExternalAdapter, ExternalEnvelope, ExternalObject,
200 ExternalRequest, Op, ProvisionReport, StateData, EXTERNAL_PROTOCOL_VERSION,
201 };
202 use alembic_core::{Schema, TypeName, TypeSchema};
203 use serde_json::json;
204 use serde_yaml::Value;
205 use std::io::BufReader;
206 use std::io::{BufRead, Write};
207
208 #[test]
209 fn external_response_ok_serializes() {
210 let response = ExternalResponse::ok(vec!["one".to_string()]);
211 let value = serde_json::to_value(&response).unwrap();
212 assert_eq!(value, json!({"ok": true, "result": ["one"]}));
213 }
214
215 #[test]
216 fn external_response_error_serializes() {
217 let response: ExternalResponse<Vec<String>> = ExternalResponse::error("boom");
218 let value = serde_json::to_value(&response).unwrap();
219 assert_eq!(value, json!({"ok": false, "error": "boom"}));
220 }
221
222 #[derive(Debug, Default)]
223 struct TestExternalAdapter {
224 pub x: i64,
225 }
226
227 impl ExternalAdapter for TestExternalAdapter {
228 fn setup(&mut self, configuration: &Value) -> anyhow::Result<()> {
229 if let Some(x) = configuration.get("x").and_then(serde_yaml::Value::as_i64) {
230 self.x = x;
231 }
232 Ok(())
233 }
234
235 fn read(
236 &mut self,
237 _schema: &Schema,
238 _types: &[TypeName],
239 _state: &StateData,
240 ) -> anyhow::Result<Vec<ExternalObject>> {
241 let mut result = vec![];
242 for _ in 0..self.x {
243 result.push(ExternalObject {
244 type_name: TypeName::new(""),
245 key: Default::default(),
246 attrs: Default::default(),
247 backend_id: None,
248 })
249 }
250 Ok(result)
251 }
252
253 fn write(
254 &mut self,
255 _schema: &Schema,
256 _ops: &[Op],
257 _state: &StateData,
258 ) -> anyhow::Result<ApplyReport> {
259 Err(anyhow::anyhow!("unsupported operation"))
260 }
261
262 fn ensure_schema(&mut self, schema: &Schema) -> anyhow::Result<ProvisionReport> {
263 let mut created_fields = vec![];
264 for ty_name in schema.types.keys() {
265 created_fields.push(ty_name.clone());
266 }
267 Ok(ProvisionReport {
268 created_fields,
269 ..Default::default()
270 })
271 }
272 }
273
274 #[test]
275 fn external_adapter_communication_over_stdio() {
276 let adapter = TestExternalAdapter::default();
277
278 let (in_reader, mut in_writer) = std::io::pipe().unwrap();
279 let (out_reader, out_writer) = std::io::pipe().unwrap();
280
281 let t = std::thread::spawn(move || {
282 assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
283 });
284
285 let dummy_type_schema = TypeSchema {
286 key: [].into(),
287 fields: [].into(),
288 };
289
290 let request = ExternalRequest::EnsureSchema {
291 schema: Schema {
292 types: [
293 ("a".to_string(), dummy_type_schema.clone()),
294 ("b".to_string(), dummy_type_schema.clone()),
295 ]
296 .into(),
297 },
298 };
299 let envelope = ExternalEnvelope {
300 version: EXTERNAL_PROTOCOL_VERSION,
301 setup: Default::default(),
302 request,
303 };
304
305 writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
306 drop(in_writer);
307
308 let mut response = String::new();
309 BufReader::new(out_reader).read_line(&mut response).unwrap();
310
311 let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
312 assert!(response.ok);
313 assert_eq!(
314 response.result.unwrap().created_fields,
315 vec!["a".to_string(), "b".to_string()]
316 );
317
318 t.join().unwrap();
319 }
320
321 #[test]
322 fn external_adapter_communication_error() {
323 let adapter = TestExternalAdapter::default();
324
325 let (in_reader, mut in_writer) = std::io::pipe().unwrap();
326 let (out_reader, out_writer) = std::io::pipe().unwrap();
327
328 let t = std::thread::spawn(move || {
329 assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
330 });
331
332 let request = ExternalRequest::Write {
334 schema: Default::default(),
335 ops: vec![],
336 state: Default::default(),
337 };
338 let envelope = ExternalEnvelope {
339 version: EXTERNAL_PROTOCOL_VERSION,
340 setup: Default::default(),
341 request,
342 };
343
344 writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
345 drop(in_writer);
346
347 let mut response = String::new();
348 BufReader::new(out_reader).read_line(&mut response).unwrap();
349
350 let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
351 assert!(response.error.is_some());
352 assert!(!response.ok);
353
354 t.join().unwrap();
355 }
356
357 #[test]
358 fn external_adapter_outdated() {
359 let adapter = TestExternalAdapter::default();
360
361 let (in_reader, mut in_writer) = std::io::pipe().unwrap();
362 let (out_reader, out_writer) = std::io::pipe().unwrap();
363
364 let t = std::thread::spawn(move || {
365 assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
366 });
367
368 let request = ExternalRequest::EnsureSchema {
369 schema: Default::default(),
370 };
371 let envelope = ExternalEnvelope {
372 version: EXTERNAL_PROTOCOL_VERSION + 1,
373 setup: Default::default(),
374 request,
375 };
376
377 writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
378 drop(in_writer);
379
380 let mut response = String::new();
381 BufReader::new(out_reader).read_line(&mut response).unwrap();
382
383 let response: ExternalResponse<ProvisionReport> = serde_json::from_str(&response).unwrap();
384 if let Some(error) = response.error {
385 assert_eq!(
386 error,
387 format!(
388 "unsupported protocol version {} (expected {})",
389 EXTERNAL_PROTOCOL_VERSION + 1,
390 EXTERNAL_PROTOCOL_VERSION
391 )
392 );
393 }
394 assert!(!response.ok);
395
396 t.join().unwrap();
397 }
398
399 #[test]
400 fn external_adapter_configuration() {
401 let adapter = TestExternalAdapter::default();
402
403 let (in_reader, mut in_writer) = std::io::pipe().unwrap();
404 let (out_reader, out_writer) = std::io::pipe().unwrap();
405
406 let t = std::thread::spawn(move || {
407 assert!(run_external_adapter(adapter, (in_reader, out_writer)).is_ok());
408 });
409
410 let request = ExternalRequest::Read {
411 schema: Default::default(),
412 types: vec![],
413 state: Default::default(),
414 };
415 const MAGIC_NUMBER: usize = 13;
416
417 let envelope = ExternalEnvelope {
418 version: EXTERNAL_PROTOCOL_VERSION,
419 setup: serde_yaml::from_str(&format!("x: {MAGIC_NUMBER}")).unwrap(),
420 request,
421 };
422
423 writeln!(in_writer, "{}", serde_json::to_string(&envelope).unwrap()).unwrap();
424 drop(in_writer);
425
426 let mut response = String::new();
427 BufReader::new(out_reader).read_line(&mut response).unwrap();
428
429 let response: ExternalResponse<Vec<ExternalObject>> =
430 serde_json::from_str(&response).unwrap();
431 assert!(response.ok);
432 assert_eq!(response.result.unwrap().len(), MAGIC_NUMBER,);
433
434 t.join().unwrap();
435 }
436}