Skip to main content

alef_adapters/
streaming.rs

1use alef_core::config::{AdapterConfig, AlefConfig, Language};
2
3/// Generate the method body and optionally a struct definition for a streaming adapter.
4///
5/// Returns `(method_body, Option<struct_definition>)`.
6/// The struct definition is only produced for languages that need a separate iterator struct
7/// (currently Python/PyO3).
8pub fn generate_body(
9    adapter: &AdapterConfig,
10    language: Language,
11    config: &AlefConfig,
12) -> anyhow::Result<(String, Option<String>)> {
13    let result = match language {
14        Language::Python => gen_python_body(adapter, config),
15        Language::Node => gen_node_body(adapter, config),
16        Language::Ruby => gen_ruby_body(adapter, config),
17        Language::Php => gen_php_body(adapter, config),
18        Language::Elixir => gen_elixir_body(adapter, config),
19        Language::Wasm => gen_wasm_body(adapter, config),
20        Language::Ffi => gen_ffi_body(adapter),
21        Language::Go => gen_go_body(adapter),
22        Language::Java => gen_java_body(adapter),
23        Language::Csharp => gen_csharp_body(adapter),
24        Language::R => gen_r_body(adapter, config),
25        Language::Rust => anyhow::bail!("Rust does not need generated binding adapters"),
26    };
27    Ok(result)
28}
29
30/// Build the call arguments with `.into()` conversion.
31fn call_args(adapter: &AdapterConfig) -> Vec<String> {
32    adapter
33        .params
34        .iter()
35        .map(|p| {
36            if p.optional {
37                format!("{}.map(Into::into)", p.name)
38            } else {
39                format!("{}.into()", p.name)
40            }
41        })
42        .collect()
43}
44
45/// Get the iterator struct name from the adapter name.
46fn iterator_name(adapter: &AdapterConfig) -> String {
47    to_pascal_case(&adapter.name) + "Iterator"
48}
49
50// ---------------------------------------------------------------------------
51// Python (PyO3)
52// ---------------------------------------------------------------------------
53
54fn gen_python_body(adapter: &AdapterConfig, config: &AlefConfig) -> (String, Option<String>) {
55    let core_path = &adapter.core_path;
56    let item_type = adapter.item_type.as_deref().unwrap_or("()");
57    let error_type = adapter.error_type.as_deref().unwrap_or("anyhow::Error");
58    let core_import = config.core_import();
59    let iter_name = iterator_name(adapter);
60
61    let args = call_args(adapter);
62    let call_str = args.join(", ");
63
64    let struct_def = format!(
65        "#[pyclass]\n\
66         pub struct {iter_name} {{\n    \
67             inner: Arc<tokio::sync::Mutex<futures::stream::BoxStream<'static, Result<{core_import}::{item_type}, {core_import}::{error_type}>>>>,\n\
68         }}\n\
69         \n\
70         #[pymethods]\n\
71         impl {iter_name} {{\n    \
72             fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {{ slf }}\n\
73             \n    \
74             fn __anext__<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyAny>>> {{\n        \
75                 let inner = self.inner.clone();\n        \
76                 pyo3_async_runtimes::tokio::future_into_py(py, async move {{\n            \
77                     let mut stream = inner.lock().await;\n            \
78                     match futures::StreamExt::next(&mut *stream).await {{\n                \
79                         Some(Ok(chunk)) => Ok(Some({item_type}::from(chunk))),\n                \
80                         Some(Err(e)) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),\n                \
81                         None => Ok(None),  // StopAsyncIteration\n            \
82                     }}\n        \
83                 }})\n    \
84             }}\n\
85         }}"
86    );
87
88    let method_body = format!(
89        "let inner = self.inner.clone();\n    \
90         let stream = inner.{core_path}({call_str});\n    \
91         Ok({iter_name} {{\n        \
92             inner: Arc::new(tokio::sync::Mutex::new(stream)),\n    \
93         }})"
94    );
95
96    (method_body, Some(struct_def))
97}
98
99// ---------------------------------------------------------------------------
100// Node (NAPI)
101// ---------------------------------------------------------------------------
102
103fn gen_node_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
104    let core_path = &adapter.core_path;
105    let item_type = adapter.item_type.as_deref().unwrap_or("()");
106
107    let args = call_args(adapter);
108    let call_str = args.join(", ");
109
110    let body = format!(
111        "use futures::StreamExt;\n    \
112         let stream = self.inner.{core_path}({call_str});\n    \
113         let chunks: Vec<_> = stream\n        \
114             .map(|r| r.map({item_type}::from))\n        \
115             .collect::<Vec<_>>().await\n        \
116             .into_iter()\n        \
117             .collect::<Result<Vec<_>, _>>()\n        \
118             .map_err(|e| napi::Error::new(napi::Status::GenericFailure, e.to_string()))?;\n    \
119         Ok(chunks)"
120    );
121
122    (body, None)
123}
124
125// ---------------------------------------------------------------------------
126// Ruby (Magnus)
127// ---------------------------------------------------------------------------
128
129fn gen_ruby_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
130    let core_path = &adapter.core_path;
131    let item_type = adapter.item_type.as_deref().unwrap_or("()");
132
133    let args = call_args(adapter);
134    let call_str = args.join(", ");
135
136    let body = format!(
137        "use futures::StreamExt;\n    \
138         let rt = tokio::runtime::Runtime::new()\n        \
139             .map_err(|e| magnus::Error::new(magnus::exception::runtime_error(), e.to_string()))?;\n    \
140         let stream = self.inner.{core_path}({call_str});\n    \
141         rt.block_on(async {{\n        \
142             stream\n            \
143                 .map(|r| r.map({item_type}::from))\n            \
144                 .collect::<Vec<_>>().await\n            \
145                 .into_iter()\n            \
146                 .collect::<Result<Vec<_>, _>>()\n    \
147         }})\n    \
148         .map_err(|e| magnus::Error::new(magnus::exception::runtime_error(), e.to_string()))"
149    );
150
151    (body, None)
152}
153
154// ---------------------------------------------------------------------------
155// PHP (ext-php-rs)
156// ---------------------------------------------------------------------------
157
158fn gen_php_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
159    let core_path = &adapter.core_path;
160    let item_type = adapter.item_type.as_deref().unwrap_or("()");
161
162    let args = call_args(adapter);
163    let call_str = args.join(", ");
164
165    let body = format!(
166        "use futures::StreamExt;\n    \
167         WORKER_RUNTIME.block_on(async {{\n        \
168             let stream = self.inner.{core_path}({call_str});\n        \
169             stream\n            \
170                 .map(|r| r.map({item_type}::from))\n            \
171                 .collect::<Vec<_>>().await\n            \
172                 .into_iter()\n            \
173                 .collect::<Result<Vec<_>, _>>()\n    \
174         }})\n    \
175         .map_err(|e| ext_php_rs::exception::PhpException::default(e.to_string()).into())"
176    );
177
178    (body, None)
179}
180
181// ---------------------------------------------------------------------------
182// Elixir (Rustler)
183// ---------------------------------------------------------------------------
184
185fn gen_elixir_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
186    let core_path = &adapter.core_path;
187    let item_type = adapter.item_type.as_deref().unwrap_or("()");
188
189    let args = call_args(adapter);
190    let call_str = args.join(", ");
191
192    let body = format!(
193        "use futures::StreamExt;\n    \
194         let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;\n    \
195         let stream = client.inner.{core_path}({call_str});\n    \
196         rt.block_on(async {{\n        \
197             stream\n            \
198                 .map(|r| r.map({item_type}::from))\n            \
199                 .collect::<Vec<_>>().await\n            \
200                 .into_iter()\n            \
201                 .collect::<Result<Vec<_>, _>>()\n    \
202         }})\n    \
203         .map_err(|e| e.to_string())"
204    );
205
206    (body, None)
207}
208
209// ---------------------------------------------------------------------------
210// WASM (wasm-bindgen)
211// ---------------------------------------------------------------------------
212
213fn gen_wasm_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
214    let core_path = &adapter.core_path;
215    let item_type = adapter.item_type.as_deref().unwrap_or("JsValue");
216
217    let args = call_args(adapter);
218    let call_str = args.join(", ");
219
220    let body = format!(
221        "use futures::StreamExt;\n    \
222         let stream = self.inner.{core_path}({call_str});\n    \
223         let chunks: Vec<_> = stream\n        \
224             .map(|r| r.map({item_type}::from))\n        \
225             .collect::<Vec<_>>().await\n        \
226             .into_iter()\n        \
227             .collect::<Result<Vec<_>, _>>()\n        \
228             .map_err(|e| JsValue::from_str(&e.to_string()))?;\n    \
229         Ok(chunks)"
230    );
231
232    (body, None)
233}
234
235// ---------------------------------------------------------------------------
236// FFI (C ABI) -- Streaming not supported
237// ---------------------------------------------------------------------------
238
239fn gen_ffi_body(adapter: &AdapterConfig) -> (String, Option<String>) {
240    let body = format!("compile_error!(\"streaming not supported via FFI: {}\")", adapter.name);
241    (body, None)
242}
243
244// ---------------------------------------------------------------------------
245// Go -- Streaming not supported via FFI
246// ---------------------------------------------------------------------------
247
248fn gen_go_body(adapter: &AdapterConfig) -> (String, Option<String>) {
249    let body = format!("compile_error!(\"streaming not supported via FFI: {}\")", adapter.name);
250    (body, None)
251}
252
253// ---------------------------------------------------------------------------
254// Java -- Streaming not supported via FFI
255// ---------------------------------------------------------------------------
256
257fn gen_java_body(adapter: &AdapterConfig) -> (String, Option<String>) {
258    let body = format!("compile_error!(\"streaming not supported via FFI: {}\")", adapter.name);
259    (body, None)
260}
261
262// ---------------------------------------------------------------------------
263// C# -- Streaming not supported via FFI
264// ---------------------------------------------------------------------------
265
266fn gen_csharp_body(adapter: &AdapterConfig) -> (String, Option<String>) {
267    let body = format!("compile_error!(\"streaming not supported via FFI: {}\")", adapter.name);
268    (body, None)
269}
270
271// ---------------------------------------------------------------------------
272// R (extendr) -- collect stream into Vec
273// ---------------------------------------------------------------------------
274
275fn gen_r_body(adapter: &AdapterConfig, _config: &AlefConfig) -> (String, Option<String>) {
276    let core_path = &adapter.core_path;
277    let item_type = adapter.item_type.as_deref().unwrap_or("Robj");
278
279    let args = call_args(adapter);
280    let call_str = args.join(", ");
281
282    let body = format!(
283        "use futures::StreamExt;\n    \
284         let rt = tokio::runtime::Runtime::new()\n        \
285             .map_err(|e| extendr_api::Error::Other(e.to_string()))?;\n    \
286         let stream = self.inner.{core_path}({call_str});\n    \
287         rt.block_on(async {{\n        \
288             stream\n            \
289                 .map(|r| r.map({item_type}::from))\n            \
290                 .collect::<Vec<_>>().await\n            \
291                 .into_iter()\n            \
292                 .collect::<Result<Vec<_>, _>>()\n    \
293         }})\n    \
294         .map_err(|e| extendr_api::Error::Other(e.to_string()))"
295    );
296
297    (body, None)
298}
299
300// ---------------------------------------------------------------------------
301// Helpers
302// ---------------------------------------------------------------------------
303
304fn to_pascal_case(s: &str) -> String {
305    s.split('_')
306        .map(|part| {
307            let mut chars = part.chars();
308            match chars.next() {
309                None => String::new(),
310                Some(first) => first.to_uppercase().to_string() + chars.as_str(),
311            }
312        })
313        .collect()
314}