1#![allow(clippy::missing_const_for_thread_local)]
2
3use proc_macro::TokenStream;
4use quote::quote;
5use syn::{Ident, LitStr, Token, parse::Parse, parse::ParseStream, parse_macro_input};
6
7type ChannelCapTriplet = (u32, u32, u32);
9type PluginCapsEntry = (String, ChannelCapTriplet);
10
11thread_local! {
12 static PLUGIN_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
14 static PLUGIN_IDS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
15 static PLUGIN_CAPS: std::cell::RefCell<Vec<PluginCapsEntry>> = std::cell::RefCell::new(Vec::new());
16 static PLUGIN_PREPROCESSOR_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
17 static PLUGIN_PREPROCESSOR_IDS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
18 static PLUGIN_UDF_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
19 static PLUGIN_UDF_DESCRIPTOR_FNS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
20 static PLUGIN_SIDE_OUTPUT_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
21 static PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
22}
23
24fn generate_plugin_identifiers(
26 namespace: &Option<LitStr>,
27 name: &LitStr,
28) -> (proc_macro2::TokenStream, String) {
29 let plugin_id = if let Some(namespace) = namespace {
30 format!("{}.{}", namespace.value(), name.value())
31 } else {
32 name.value()
33 };
34
35 let component_id = quote! { #plugin_id };
37
38 (component_id, plugin_id)
39}
40
41struct PluginComponent {
42 namespace: Option<LitStr>,
43 name: LitStr,
44 component_type: Ident,
45 caps: Vec<u32>,
46}
47
48impl Parse for PluginComponent {
49 fn parse(input: ParseStream) -> syn::Result<Self> {
50 let namespace_or_name: LitStr = input.parse()?;
51 input.parse::<Token![,]>()?;
52 if input.peek(LitStr) {
53 let name: LitStr = input.parse()?;
55 input.parse::<Token![,]>()?;
56 let component_type: Ident = input.parse()?;
57 let mut caps: Vec<u32> = Vec::new();
59 while input.peek(Token![,]) {
60 input.parse::<Token![,]>()?;
61 if input.is_empty() {
62 break;
63 }
64 if let Ok(int_lit) = input.parse::<syn::LitInt>() {
65 caps.push(int_lit.base10_parse::<u32>()?);
66 } else {
67 break;
69 }
70 }
71 Ok(PluginComponent {
72 namespace: Some(namespace_or_name),
73 name,
74 component_type,
75 caps,
76 })
77 } else if input.peek(Ident) {
78 let component_type: Ident = input.parse()?;
80 let mut caps: Vec<u32> = Vec::new();
82 while input.peek(Token![,]) {
83 input.parse::<Token![,]>()?;
84 if input.is_empty() {
85 break;
86 }
87 if let Ok(int_lit) = input.parse::<syn::LitInt>() {
88 caps.push(int_lit.base10_parse::<u32>()?);
89 } else {
90 break;
91 }
92 }
93 Ok(PluginComponent {
94 namespace: None,
95 name: namespace_or_name,
96 component_type,
97 caps,
98 })
99 } else {
100 Err(syn::Error::new_spanned(
101 namespace_or_name,
102 "Expected a string literal or identifier for the namespace or name",
103 ))
104 }
105 }
106}
107
108#[proc_macro]
112pub fn set_plugin_input_buffer(input: TokenStream) -> TokenStream {
113 let input2 = proc_macro2::TokenStream::from(input);
114 let mut tokens = input2.into_iter();
115
116 let plugin_id_lit = match tokens.next() {
118 Some(proc_macro2::TokenTree::Literal(lit)) => {
119 let lit_str = lit.to_string();
120 if lit_str.starts_with('"') && lit_str.ends_with('"') {
121 lit_str[1..lit_str.len() - 1].to_string()
122 } else {
123 return syn::Error::new_spanned(
124 lit,
125 "First argument must be a string literal plugin id",
126 )
127 .to_compile_error()
128 .into();
129 }
130 }
131 other => {
132 return syn::Error::new_spanned(
133 quote::quote! { #other },
134 "First argument must be a string literal plugin id",
135 )
136 .to_compile_error()
137 .into();
138 }
139 };
140
141 match tokens.next() {
143 Some(proc_macro2::TokenTree::Punct(punct)) if punct.as_char() == ',' => {}
144 _ => {
145 return syn::Error::new_spanned(
146 quote::quote! { set_plugin_input_buffer },
147 "Expected comma after plugin id",
148 )
149 .to_compile_error()
150 .into();
151 }
152 }
153
154 let in_cap = match tokens.next() {
156 Some(proc_macro2::TokenTree::Literal(lit)) => lit.to_string().parse::<u32>().unwrap_or(1),
157 other => {
158 return syn::Error::new_spanned(
159 quote::quote! { #other },
160 "Second argument must be an integer capacity",
161 )
162 .to_compile_error()
163 .into();
164 }
165 };
166
167 PLUGIN_CAPS.with(|caps| caps.borrow_mut().push((plugin_id_lit, (in_cap, 0, 0))));
168 TokenStream::new()
169}
170
171#[proc_macro]
172pub fn set_plugin_output_buffer(input: TokenStream) -> TokenStream {
173 let input2 = proc_macro2::TokenStream::from(input);
174 let mut tokens = input2.into_iter();
175
176 let plugin_id_lit = match tokens.next() {
178 Some(proc_macro2::TokenTree::Literal(lit)) => {
179 let lit_str = lit.to_string();
180 if lit_str.starts_with('"') && lit_str.ends_with('"') {
181 lit_str[1..lit_str.len() - 1].to_string()
182 } else {
183 return syn::Error::new_spanned(
184 lit,
185 "First argument must be a string literal plugin id",
186 )
187 .to_compile_error()
188 .into();
189 }
190 }
191 other => {
192 return syn::Error::new_spanned(
193 quote::quote! { #other },
194 "First argument must be a string literal plugin id",
195 )
196 .to_compile_error()
197 .into();
198 }
199 };
200
201 match tokens.next() {
203 Some(proc_macro2::TokenTree::Punct(punct)) if punct.as_char() == ',' => {}
204 _ => {
205 return syn::Error::new_spanned(
206 quote::quote! { set_plugin_output_buffer },
207 "Expected comma after plugin id",
208 )
209 .to_compile_error()
210 .into();
211 }
212 }
213
214 let out_cap = match tokens.next() {
216 Some(proc_macro2::TokenTree::Literal(lit)) => lit.to_string().parse::<u32>().unwrap_or(1),
217 other => {
218 return syn::Error::new_spanned(
219 quote::quote! { #other },
220 "Second argument must be an integer capacity",
221 )
222 .to_compile_error()
223 .into();
224 }
225 };
226
227 PLUGIN_CAPS.with(|caps| caps.borrow_mut().push((plugin_id_lit, (0, out_cap, 0))));
228 TokenStream::new()
229}
230
231#[allow(dead_code)]
232struct InitPlugin;
233
234impl Parse for InitPlugin {
235 fn parse(_input: ParseStream) -> syn::Result<Self> {
236 Ok(InitPlugin)
237 }
238}
239
240#[proc_macro]
241pub fn register_plugin_source(input: TokenStream) -> TokenStream {
242 let PluginComponent {
243 namespace,
244 name,
245 component_type,
246 caps: _caps,
247 } = parse_macro_input!(input as PluginComponent);
248
249 let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
250
251 let component = quote! {
252 #component_id => source_generator(
253 plugin_id,
254 |rt, state, metrics, opts| streamling_plugin::IntoSourcePluginResult::into_source_result(#component_type::new(rt, state, metrics, opts)),
255 options,
256 runtime,
257 state_backend_config,
258 message_channels,
259 ),
260 };
261
262 PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
263 PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
264
265 TokenStream::new()
266}
267
268#[proc_macro]
269pub fn register_plugin_transform(input: TokenStream) -> TokenStream {
270 let PluginComponent {
271 namespace,
272 name,
273 component_type,
274 caps: _caps,
275 } = parse_macro_input!(input as PluginComponent);
276
277 let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
278
279 let component = quote! {
280 #component_id => transform_generator(
281 plugin_id,
282 |schema, rt, state, metrics, opts| streamling_plugin::IntoTransformPluginResult::into_transform_result(#component_type::new(schema, rt, state, metrics, opts)),
283 input_schema.expect("Input schema must be defined for transforms"),
284 options,
285 runtime,
286 state_backend_config,
287 message_channels,
288 ),
289 };
290
291 PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
292 PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
293
294 TokenStream::new()
295}
296
297#[proc_macro]
298pub fn register_plugin_sink(input: TokenStream) -> TokenStream {
299 let PluginComponent {
300 namespace,
301 name,
302 component_type,
303 caps: _caps,
304 } = parse_macro_input!(input as PluginComponent);
305
306 let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
307
308 let component = quote! {
309 #component_id => sink_generator(
310 plugin_id,
311 |schema, rt, state, metrics, opts| streamling_plugin::IntoSinkPluginResult::into_sink_result(#component_type::new(schema, rt, state, metrics, opts)),
312 input_schema.expect("Input schema must be defined for sinks"),
313 options,
314 runtime,
315 state_backend_config,
316 message_channels,
317 ),
318 };
319
320 PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
321 PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
322
323 TokenStream::new()
324}
325
326#[proc_macro]
327pub fn register_plugin_preprocessor(input: TokenStream) -> TokenStream {
328 let PluginComponent {
329 namespace,
330 name,
331 component_type,
332 caps: _caps,
333 } = parse_macro_input!(input as PluginComponent);
334
335 let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
336
337 let component = quote! {
338 #component_id => preprocessor_generator(
339 plugin_id,
340 |opts| #component_type::new(opts).map(|p| std::sync::Arc::new(p) as std::sync::Arc<dyn streamling_plugin::PreprocessorPlugin>),
341 options,
342 runtime,
343 message_channels,
344 ),
345 };
346
347 PLUGIN_PREPROCESSOR_COMPONENTS
348 .with(|components| components.borrow_mut().push(component.to_string()));
349 PLUGIN_PREPROCESSOR_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
350
351 TokenStream::new()
352}
353
354#[proc_macro]
360pub fn register_plugin_udf_fn(input: TokenStream) -> TokenStream {
361 let factory_fn: Ident = parse_macro_input!(input as Ident);
362
363 let static_name = Ident::new(
364 &format!("PLUGIN_UDF_FN_{}", factory_fn.to_string().to_uppercase()),
365 factory_fn.span(),
366 );
367 let invoke_fn_name = Ident::new(
368 &format!(
369 "plugin_udf_fn_invoke_{}",
370 factory_fn.to_string().to_lowercase()
371 ),
372 factory_fn.span(),
373 );
374 let descriptor_fn_name = Ident::new(
375 &format!(
376 "plugin_udf_fn_descriptor_{}",
377 factory_fn.to_string().to_lowercase()
378 ),
379 factory_fn.span(),
380 );
381
382 let component = quote! {
383 static #static_name: std::sync::OnceLock<datafusion::logical_expr::ScalarUDF> = std::sync::OnceLock::new();
384
385 extern "C" fn #invoke_fn_name(
386 args: RVec<streamling_plugin::SafeUdfArg>,
387 number_rows: usize,
388 ) -> RResult<streamling_plugin::SafeArrowColumn, RString> {
389 let udf = #static_name.get_or_init(#factory_fn);
390 streamling_plugin::invoke_plugin_udf(udf.inner().as_ref(), args, number_rows)
391 }
392
393 fn #descriptor_fn_name() -> Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError> {
394 let udf = #static_name.get_or_init(#factory_fn);
395 streamling_plugin::build_plugin_udf_descriptor(udf.inner().as_ref(), #invoke_fn_name)
396 }
397 };
398
399 PLUGIN_UDF_COMPONENTS.with(|components| {
400 components.borrow_mut().push(component.to_string());
401 });
402 PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
403 fns.borrow_mut().push(descriptor_fn_name.to_string());
404 });
405
406 TokenStream::new()
407}
408
409#[proc_macro]
415pub fn register_plugin_udf(input: TokenStream) -> TokenStream {
416 let udf_type: Ident = parse_macro_input!(input as Ident);
417
418 let static_name = Ident::new(
419 &format!("PLUGIN_UDF_{}", udf_type.to_string().to_uppercase()),
420 udf_type.span(),
421 );
422 let invoke_fn_name = Ident::new(
423 &format!("plugin_udf_invoke_{}", udf_type.to_string().to_lowercase()),
424 udf_type.span(),
425 );
426 let descriptor_fn_name = Ident::new(
427 &format!(
428 "plugin_udf_descriptor_{}",
429 udf_type.to_string().to_lowercase()
430 ),
431 udf_type.span(),
432 );
433
434 let component = quote! {
435 static #static_name: std::sync::OnceLock<#udf_type> = std::sync::OnceLock::new();
436
437 extern "C" fn #invoke_fn_name(
438 args: RVec<streamling_plugin::SafeUdfArg>,
439 number_rows: usize,
440 ) -> RResult<streamling_plugin::SafeArrowColumn, RString> {
441 let instance = #static_name.get_or_init(|| #udf_type::new());
442 streamling_plugin::invoke_plugin_udf(instance, args, number_rows)
443 }
444
445 fn #descriptor_fn_name() -> Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError> {
446 let instance = #static_name.get_or_init(|| #udf_type::new());
447 streamling_plugin::build_plugin_udf_descriptor(instance, #invoke_fn_name)
448 }
449 };
450
451 PLUGIN_UDF_COMPONENTS.with(|components| {
452 components.borrow_mut().push(component.to_string());
453 });
454 PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
455 fns.borrow_mut().push(descriptor_fn_name.to_string());
456 });
457
458 TokenStream::new()
459}
460
461#[proc_macro]
468pub fn register_plugin_side_output(input: TokenStream) -> TokenStream {
469 let input2: proc_macro2::TokenStream = input.into();
471 let mut iter = input2.into_iter().peekable();
472
473 let (side_output_id, side_output_type) = match iter.peek() {
475 Some(proc_macro2::TokenTree::Literal(_)) => {
476 let lit = match iter.next().unwrap() {
477 proc_macro2::TokenTree::Literal(lit) => {
478 let s = lit.to_string();
479 if s.starts_with('"') && s.ends_with('"') {
480 s[1..s.len() - 1].to_string()
481 } else {
482 return syn::Error::new_spanned(lit, "Expected string literal for id")
483 .to_compile_error()
484 .into();
485 }
486 }
487 _ => unreachable!(),
488 };
489 iter.next();
491 let type_ident: proc_macro2::TokenStream = iter.collect();
492 let type_ident: Ident =
493 syn::parse2(type_ident).expect("Expected type identifier after id");
494 (lit, type_ident)
495 }
496 _ => {
497 let type_tokens: proc_macro2::TokenStream = iter.collect();
498 let type_ident: Ident = syn::parse2(type_tokens).expect("Expected type identifier");
499 let id = type_ident.to_string().to_lowercase();
500 (id, type_ident)
501 }
502 };
503
504 let static_name = Ident::new(
505 &format!(
506 "PLUGIN_SIDE_OUTPUT_{}",
507 side_output_type.to_string().to_uppercase()
508 ),
509 side_output_type.span(),
510 );
511 let init_fn_name = Ident::new(
512 &format!(
513 "plugin_side_output_initialize_{}",
514 side_output_type.to_string().to_lowercase()
515 ),
516 side_output_type.span(),
517 );
518 let process_fn_name = Ident::new(
519 &format!(
520 "plugin_side_output_process_batch_{}",
521 side_output_type.to_string().to_lowercase()
522 ),
523 side_output_type.span(),
524 );
525 let shutdown_fn_name = Ident::new(
526 &format!(
527 "plugin_side_output_shutdown_{}",
528 side_output_type.to_string().to_lowercase()
529 ),
530 side_output_type.span(),
531 );
532 let descriptor_fn_name = Ident::new(
533 &format!(
534 "plugin_side_output_descriptor_{}",
535 side_output_type.to_string().to_lowercase()
536 ),
537 side_output_type.span(),
538 );
539
540 let component = quote! {
541 static #static_name: std::sync::LazyLock<
542 std::sync::RwLock<std::collections::HashMap<String, #side_output_type>>
543 > = std::sync::LazyLock::new(|| std::sync::RwLock::new(std::collections::HashMap::new()));
544
545 extern "C" fn #init_fn_name(
546 source_name: RString,
547 schema: streamling_plugin::SafeArrowSchema,
548 options: streamling_plugin::PluginOptions,
549 metrics_recorder: streamling_plugin::ffi::PluginMetricsRecorder,
550 ) -> RResult<(), RString> {
551 let schema_ref: arrow::datatypes::SchemaRef = schema.into();
552 let instance = #side_output_type::new(
553 source_name.as_str(),
554 schema_ref,
555 options.as_rust(),
556 metrics_recorder,
557 );
558 let mut map = #static_name.write().expect("side output instance map write lock");
559 map.insert(source_name.as_str().to_string(), instance);
560 RResult::ROk(())
561 }
562
563 extern "C" fn #process_fn_name(
564 source_name: RString,
565 data: streamling_plugin::ffi::SafeArrowArray,
566 ) -> RResult<(), RString> {
567 let map = #static_name.read().expect("side output instance map read lock");
568 let instance = match map.get(source_name.as_str()) {
569 Some(i) => i,
570 None => return RResult::RErr(RString::from("Side output not initialized for source")),
571 };
572 let batch: arrow::array::RecordBatch = data.into();
573 match instance.process_batch(&batch) {
574 Ok(()) => RResult::ROk(()),
575 Err(msg) => RResult::RErr(RString::from(msg)),
576 }
577 }
578
579 extern "C" fn #shutdown_fn_name() -> RResult<(), RString> {
580 let mut map = #static_name.write().expect("side output instance map write lock");
581 for instance in map.values() {
582 instance.shutdown();
583 }
584 map.clear();
587 RResult::ROk(())
588 }
589
590 fn #descriptor_fn_name() -> streamling_plugin::PluginSideOutputDescriptor {
591 streamling_plugin::PluginSideOutputDescriptor {
592 id: RString::from(#side_output_id),
593 initialize: #init_fn_name,
594 process_batch: #process_fn_name,
595 shutdown: #shutdown_fn_name,
596 }
597 }
598 };
599
600 PLUGIN_SIDE_OUTPUT_COMPONENTS.with(|components| {
601 components.borrow_mut().push(component.to_string());
602 });
603 PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS.with(|fns| {
604 fns.borrow_mut().push(descriptor_fn_name.to_string());
605 });
606
607 TokenStream::new()
608}
609
610#[proc_macro]
611pub fn init_plugin(_input: TokenStream) -> TokenStream {
612 generate_init_plugin_code(false)
613}
614
615#[proc_macro]
616pub fn init_plugin_with_async_runtime(_input: TokenStream) -> TokenStream {
617 generate_init_plugin_code(true)
618}
619
620fn generate_init_plugin_code(use_direct_tokio: bool) -> TokenStream {
621 let components = PLUGIN_COMPONENTS.with(|components| {
622 let borrowed = components.borrow();
623 let mut combined = proc_macro2::TokenStream::new();
624
625 for component_str in borrowed.iter() {
626 if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
627 combined.extend(component_tokens);
628 }
629 }
630
631 combined
632 });
633
634 let preprocessor_components = PLUGIN_PREPROCESSOR_COMPONENTS.with(|components| {
635 let borrowed = components.borrow();
636 let mut combined = proc_macro2::TokenStream::new();
637
638 for component_str in borrowed.iter() {
639 if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
640 combined.extend(component_tokens);
641 }
642 }
643
644 combined
645 });
646
647 let udf_components = PLUGIN_UDF_COMPONENTS.with(|components| {
648 let borrowed = components.borrow();
649 let mut combined = proc_macro2::TokenStream::new();
650
651 for component_str in borrowed.iter() {
652 if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
653 combined.extend(component_tokens);
654 }
655 }
656
657 combined
658 });
659
660 let side_output_components = PLUGIN_SIDE_OUTPUT_COMPONENTS.with(|components| {
661 let borrowed = components.borrow();
662 let mut combined = proc_macro2::TokenStream::new();
663
664 for component_str in borrowed.iter() {
665 if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
666 combined.extend(component_tokens);
667 }
668 }
669
670 combined
671 });
672
673 let udf_descriptor_calls: Vec<proc_macro2::TokenStream> =
674 PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
675 fns.borrow()
676 .iter()
677 .map(|fn_name| {
678 let ident: proc_macro2::TokenStream = fn_name.parse().unwrap();
679 quote! { #ident() }
680 })
681 .collect()
682 });
683
684 let side_output_descriptor_calls: Vec<proc_macro2::TokenStream> =
685 PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS.with(|fns| {
686 fns.borrow()
687 .iter()
688 .map(|fn_name| {
689 let ident: proc_macro2::TokenStream = fn_name.parse().unwrap();
690 quote! { #ident() }
691 })
692 .collect()
693 });
694
695 let has_udfs = !udf_descriptor_calls.is_empty();
696 let has_side_outputs = !side_output_descriptor_calls.is_empty();
697
698 let plugin_ids = PLUGIN_IDS.with(|ids| ids.borrow().clone());
699 let preprocessor_ids = PLUGIN_PREPROCESSOR_IDS.with(|ids| ids.borrow().clone());
700 let all_plugin_ids: Vec<String> = plugin_ids
701 .iter()
702 .chain(preprocessor_ids.iter())
703 .cloned()
704 .collect();
705
706 let plugin_caps = PLUGIN_CAPS.with(|caps| caps.borrow().clone());
707 let caps_inits: Vec<proc_macro2::TokenStream> = plugin_caps
708 .iter()
709 .map(|(id, (a, b, c))| {
710 quote! {
711 default_channel_caps.insert(RString::from(#id), PluginChannelCaps { input: #a, output: #b, metrics: #c });
712 }
713 })
714 .collect();
715
716 let async_imports = if use_direct_tokio {
717 quote! { use streamling_plugin::r#async::{PluginAsyncRuntimeObj, DirectTokioProxy}; }
718 } else {
719 quote! { use streamling_plugin::r#async::PluginAsyncRuntimeObj; }
720 };
721
722 let (runtime_param, runtime_setup) = if use_direct_tokio {
723 (
724 quote! { _runtime: PluginAsyncRuntimeObj, },
725 quote! { let runtime = DirectTokioProxy::new().into_async_runtime_obj(); },
726 )
727 } else {
728 (quote! { runtime: PluginAsyncRuntimeObj, }, quote! {})
729 };
730
731 let udf_descriptors_fn = if has_udfs {
732 quote! {
733 extern "C" fn udf_descriptors() -> RResult<RVec<streamling_plugin::PluginUdfDescriptor>, PluginInitializationError> {
734 let results: Vec<Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError>> = vec![
735 #(#udf_descriptor_calls),*
736 ];
737 let mut descriptors = Vec::with_capacity(results.len());
738 for result in results {
739 match result {
740 Ok(descriptor) => descriptors.push(descriptor),
741 Err(e) => return RResult::RErr(e),
742 }
743 }
744 RResult::ROk(descriptors.into())
745 }
746 }
747 } else {
748 quote! {
749 extern "C" fn udf_descriptors() -> RResult<RVec<streamling_plugin::PluginUdfDescriptor>, PluginInitializationError> {
750 RResult::ROk(RVec::new())
751 }
752 }
753 };
754
755 let side_output_descriptors_fn = if has_side_outputs {
756 quote! {
757 extern "C" fn side_output_descriptors() -> RResult<RVec<streamling_plugin::PluginSideOutputDescriptor>, PluginInitializationError> {
758 let descriptors: Vec<streamling_plugin::PluginSideOutputDescriptor> = vec![
759 #(#side_output_descriptor_calls),*
760 ];
761 RResult::ROk(descriptors.into())
762 }
763 }
764 } else {
765 quote! {
766 extern "C" fn side_output_descriptors() -> RResult<RVec<streamling_plugin::PluginSideOutputDescriptor>, PluginInitializationError> {
767 RResult::ROk(RVec::new())
768 }
769 }
770 };
771
772 let output = quote! {
773 use abi_stable::export_root_module;
774 use abi_stable::prefix_type::PrefixTypeTrait;
775 use abi_stable::std_types::{ROption, RResult, RString, RVec, RHashMap};
776 use abi_stable::traits::IntoReprC;
777 use streamling_plugin::ffi::SafeArrowSchema;
778 #async_imports
779 use streamling_plugin::{PluginStateBackendConfig, PluginChannels, PluginInitializationError, PluginLogging, PluginModule, PluginModuleRef, PluginOptions, PluginResult, PluginRuntimeConfiguration, PluginChannelCaps, SideOutputPlugin, sink_generator, source_generator, transform_generator, preprocessor_generator};
780
781 #udf_components
782 #side_output_components
783
784 extern "C" fn init(
785 logging: PluginLogging,
786 ) -> RResult<PluginRuntimeConfiguration, PluginInitializationError> {
787 logging.initialize_logging();
788
789 let plugin_ids: RVec<RString> = vec![
790 #(#all_plugin_ids.to_string().into_c()),*
791 ].into();
792
793 let mut default_channel_caps: RHashMap<RString, PluginChannelCaps> = RHashMap::new();
794 #(#caps_inits)*
795
796 Ok(PluginRuntimeConfiguration {
797 plugin_ids,
798 default_channel_caps,
799 }).into_c()
800 }
801
802 extern "C" fn create(
803 plugin_id: RString,
804 input_schema: ROption<SafeArrowSchema>,
805 options: PluginOptions,
806 #runtime_param
807 state_backend_config: PluginStateBackendConfig,
808 message_channels: PluginChannels,
809 ) -> RResult<PluginResult, PluginInitializationError> {
810 #runtime_setup
811
812 match plugin_id.as_str() {
813 #components
814 #preprocessor_components
815 _ => Err(PluginInitializationError::NotImplemented).into_c(),
816 }
817 }
818
819 #udf_descriptors_fn
820 #side_output_descriptors_fn
821
822 #[export_root_module]
823 pub fn get_module() -> PluginModuleRef {
824 PluginModule { init, create, udf_descriptors, side_output_descriptors }.leak_into_prefix()
825 }
826 };
827
828 output.into()
829}