Skip to main content

streamling_plugin_derive/
lib.rs

1#![allow(clippy::missing_const_for_thread_local)]
2
3use proc_macro::TokenStream;
4use quote::quote;
5use syn::{Ident, LitStr, Token, parse::Parse, parse::ParseStream, parse_macro_input};
6
7// Type aliases to keep static/thread_local types readable and clippy-friendly
8type ChannelCapTriplet = (u32, u32, u32);
9type PluginCapsEntry = (String, ChannelCapTriplet);
10
11thread_local! {
12    // Can't store the actual TokenStream2 due to the "use-after-free" compiler error
13    static PLUGIN_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
14    static PLUGIN_IDS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
15    static PLUGIN_CAPS: std::cell::RefCell<Vec<PluginCapsEntry>> = std::cell::RefCell::new(Vec::new());
16    static PLUGIN_PREPROCESSOR_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
17    static PLUGIN_PREPROCESSOR_IDS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
18    static PLUGIN_UDF_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
19    static PLUGIN_UDF_DESCRIPTOR_FNS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
20    static PLUGIN_SIDE_OUTPUT_COMPONENTS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
21    static PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS: std::cell::RefCell<Vec<String>> = std::cell::RefCell::new(Vec::new());
22}
23
24// Helper function to generate both component_id and plugin_id string
25fn generate_plugin_identifiers(
26    namespace: &Option<LitStr>,
27    name: &LitStr,
28) -> (proc_macro2::TokenStream, String) {
29    let plugin_id = if let Some(namespace) = namespace {
30        format!("{}.{}", namespace.value(), name.value())
31    } else {
32        name.value()
33    };
34
35    // Create a string literal token for the match pattern
36    let component_id = quote! { #plugin_id };
37
38    (component_id, plugin_id)
39}
40
41struct PluginComponent {
42    namespace: Option<LitStr>,
43    name: LitStr,
44    component_type: Ident,
45    caps: Vec<u32>,
46}
47
48impl Parse for PluginComponent {
49    fn parse(input: ParseStream) -> syn::Result<Self> {
50        let namespace_or_name: LitStr = input.parse()?;
51        input.parse::<Token![,]>()?;
52        if input.peek(LitStr) {
53            // two literals, so namespace and name
54            let name: LitStr = input.parse()?;
55            input.parse::<Token![,]>()?;
56            let component_type: Ident = input.parse()?;
57            // Optional capacities
58            let mut caps: Vec<u32> = Vec::new();
59            while input.peek(Token![,]) {
60                input.parse::<Token![,]>()?;
61                if input.is_empty() {
62                    break;
63                }
64                if let Ok(int_lit) = input.parse::<syn::LitInt>() {
65                    caps.push(int_lit.base10_parse::<u32>()?);
66                } else {
67                    // If the next token isn't an int, stop parsing caps
68                    break;
69                }
70            }
71            Ok(PluginComponent {
72                namespace: Some(namespace_or_name),
73                name,
74                component_type,
75                caps,
76            })
77        } else if input.peek(Ident) {
78            // identifier after a single literal, so no namespace
79            let component_type: Ident = input.parse()?;
80            // Optional capacities
81            let mut caps: Vec<u32> = Vec::new();
82            while input.peek(Token![,]) {
83                input.parse::<Token![,]>()?;
84                if input.is_empty() {
85                    break;
86                }
87                if let Ok(int_lit) = input.parse::<syn::LitInt>() {
88                    caps.push(int_lit.base10_parse::<u32>()?);
89                } else {
90                    break;
91                }
92            }
93            Ok(PluginComponent {
94                namespace: None,
95                name: namespace_or_name,
96                component_type,
97                caps,
98            })
99        } else {
100            Err(syn::Error::new_spanned(
101                namespace_or_name,
102                "Expected a string literal or identifier for the namespace or name",
103            ))
104        }
105    }
106}
107
108// Explicit setters for input/output caps as trailing macros
109// Usage: set_plugin_input_buffer!("plugin.id", 1);
110//        set_plugin_output_buffer!("plugin.id", 2);
111#[proc_macro]
112pub fn set_plugin_input_buffer(input: TokenStream) -> TokenStream {
113    let input2 = proc_macro2::TokenStream::from(input);
114    let mut tokens = input2.into_iter();
115
116    // Parse plugin_id (string literal)
117    let plugin_id_lit = match tokens.next() {
118        Some(proc_macro2::TokenTree::Literal(lit)) => {
119            let lit_str = lit.to_string();
120            if lit_str.starts_with('"') && lit_str.ends_with('"') {
121                lit_str[1..lit_str.len() - 1].to_string()
122            } else {
123                return syn::Error::new_spanned(
124                    lit,
125                    "First argument must be a string literal plugin id",
126                )
127                .to_compile_error()
128                .into();
129            }
130        }
131        other => {
132            return syn::Error::new_spanned(
133                quote::quote! { #other },
134                "First argument must be a string literal plugin id",
135            )
136            .to_compile_error()
137            .into();
138        }
139    };
140
141    // Skip comma
142    match tokens.next() {
143        Some(proc_macro2::TokenTree::Punct(punct)) if punct.as_char() == ',' => {}
144        _ => {
145            return syn::Error::new_spanned(
146                quote::quote! { set_plugin_input_buffer },
147                "Expected comma after plugin id",
148            )
149            .to_compile_error()
150            .into();
151        }
152    }
153
154    // Parse capacity (integer literal)
155    let in_cap = match tokens.next() {
156        Some(proc_macro2::TokenTree::Literal(lit)) => lit.to_string().parse::<u32>().unwrap_or(1),
157        other => {
158            return syn::Error::new_spanned(
159                quote::quote! { #other },
160                "Second argument must be an integer capacity",
161            )
162            .to_compile_error()
163            .into();
164        }
165    };
166
167    PLUGIN_CAPS.with(|caps| caps.borrow_mut().push((plugin_id_lit, (in_cap, 0, 0))));
168    TokenStream::new()
169}
170
171#[proc_macro]
172pub fn set_plugin_output_buffer(input: TokenStream) -> TokenStream {
173    let input2 = proc_macro2::TokenStream::from(input);
174    let mut tokens = input2.into_iter();
175
176    // Parse plugin_id (string literal)
177    let plugin_id_lit = match tokens.next() {
178        Some(proc_macro2::TokenTree::Literal(lit)) => {
179            let lit_str = lit.to_string();
180            if lit_str.starts_with('"') && lit_str.ends_with('"') {
181                lit_str[1..lit_str.len() - 1].to_string()
182            } else {
183                return syn::Error::new_spanned(
184                    lit,
185                    "First argument must be a string literal plugin id",
186                )
187                .to_compile_error()
188                .into();
189            }
190        }
191        other => {
192            return syn::Error::new_spanned(
193                quote::quote! { #other },
194                "First argument must be a string literal plugin id",
195            )
196            .to_compile_error()
197            .into();
198        }
199    };
200
201    // Skip comma
202    match tokens.next() {
203        Some(proc_macro2::TokenTree::Punct(punct)) if punct.as_char() == ',' => {}
204        _ => {
205            return syn::Error::new_spanned(
206                quote::quote! { set_plugin_output_buffer },
207                "Expected comma after plugin id",
208            )
209            .to_compile_error()
210            .into();
211        }
212    }
213
214    // Parse capacity (integer literal)
215    let out_cap = match tokens.next() {
216        Some(proc_macro2::TokenTree::Literal(lit)) => lit.to_string().parse::<u32>().unwrap_or(1),
217        other => {
218            return syn::Error::new_spanned(
219                quote::quote! { #other },
220                "Second argument must be an integer capacity",
221            )
222            .to_compile_error()
223            .into();
224        }
225    };
226
227    PLUGIN_CAPS.with(|caps| caps.borrow_mut().push((plugin_id_lit, (0, out_cap, 0))));
228    TokenStream::new()
229}
230
231#[allow(dead_code)]
232struct InitPlugin;
233
234impl Parse for InitPlugin {
235    fn parse(_input: ParseStream) -> syn::Result<Self> {
236        Ok(InitPlugin)
237    }
238}
239
240#[proc_macro]
241pub fn register_plugin_source(input: TokenStream) -> TokenStream {
242    let PluginComponent {
243        namespace,
244        name,
245        component_type,
246        caps: _caps,
247    } = parse_macro_input!(input as PluginComponent);
248
249    let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
250
251    let component = quote! {
252        #component_id => source_generator(
253            plugin_id,
254            |rt, state, metrics, opts| streamling_plugin::IntoSourcePluginResult::into_source_result(#component_type::new(rt, state, metrics, opts)),
255            options,
256            runtime,
257            state_backend_config,
258            message_channels,
259        ),
260    };
261
262    PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
263    PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
264
265    TokenStream::new()
266}
267
268#[proc_macro]
269pub fn register_plugin_transform(input: TokenStream) -> TokenStream {
270    let PluginComponent {
271        namespace,
272        name,
273        component_type,
274        caps: _caps,
275    } = parse_macro_input!(input as PluginComponent);
276
277    let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
278
279    let component = quote! {
280        #component_id => transform_generator(
281            plugin_id,
282            |schema, rt, state, metrics, opts| streamling_plugin::IntoTransformPluginResult::into_transform_result(#component_type::new(schema, rt, state, metrics, opts)),
283            input_schema.expect("Input schema must be defined for transforms"),
284            options,
285            runtime,
286            state_backend_config,
287            message_channels,
288        ),
289    };
290
291    PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
292    PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
293
294    TokenStream::new()
295}
296
297#[proc_macro]
298pub fn register_plugin_sink(input: TokenStream) -> TokenStream {
299    let PluginComponent {
300        namespace,
301        name,
302        component_type,
303        caps: _caps,
304    } = parse_macro_input!(input as PluginComponent);
305
306    let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
307
308    let component = quote! {
309        #component_id => sink_generator(
310            plugin_id,
311            |schema, rt, state, metrics, opts| streamling_plugin::IntoSinkPluginResult::into_sink_result(#component_type::new(schema, rt, state, metrics, opts)),
312            input_schema.expect("Input schema must be defined for sinks"),
313            options,
314            runtime,
315            state_backend_config,
316            message_channels,
317        ),
318    };
319
320    PLUGIN_COMPONENTS.with(|components| components.borrow_mut().push(component.to_string()));
321    PLUGIN_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
322
323    TokenStream::new()
324}
325
326#[proc_macro]
327pub fn register_plugin_preprocessor(input: TokenStream) -> TokenStream {
328    let PluginComponent {
329        namespace,
330        name,
331        component_type,
332        caps: _caps,
333    } = parse_macro_input!(input as PluginComponent);
334
335    let (component_id, plugin_id) = generate_plugin_identifiers(&namespace, &name);
336
337    let component = quote! {
338        #component_id => preprocessor_generator(
339            plugin_id,
340            |opts| #component_type::new(opts).map(|p| std::sync::Arc::new(p) as std::sync::Arc<dyn streamling_plugin::PreprocessorPlugin>),
341            options,
342            runtime,
343            message_channels,
344        ),
345    };
346
347    PLUGIN_PREPROCESSOR_COMPONENTS
348        .with(|components| components.borrow_mut().push(component.to_string()));
349    PLUGIN_PREPROCESSOR_IDS.with(|ids| ids.borrow_mut().push(plugin_id));
350
351    TokenStream::new()
352}
353
354/// Registers a factory function that returns a `ScalarUDF` as a plugin UDF.
355///
356/// Usage: `register_plugin_udf_fn!(create_my_udf);`
357///
358/// The function must have signature `fn() -> ScalarUDF`.
359#[proc_macro]
360pub fn register_plugin_udf_fn(input: TokenStream) -> TokenStream {
361    let factory_fn: Ident = parse_macro_input!(input as Ident);
362
363    let static_name = Ident::new(
364        &format!("PLUGIN_UDF_FN_{}", factory_fn.to_string().to_uppercase()),
365        factory_fn.span(),
366    );
367    let invoke_fn_name = Ident::new(
368        &format!(
369            "plugin_udf_fn_invoke_{}",
370            factory_fn.to_string().to_lowercase()
371        ),
372        factory_fn.span(),
373    );
374    let descriptor_fn_name = Ident::new(
375        &format!(
376            "plugin_udf_fn_descriptor_{}",
377            factory_fn.to_string().to_lowercase()
378        ),
379        factory_fn.span(),
380    );
381
382    let component = quote! {
383        static #static_name: std::sync::OnceLock<datafusion::logical_expr::ScalarUDF> = std::sync::OnceLock::new();
384
385        extern "C" fn #invoke_fn_name(
386            args: RVec<streamling_plugin::SafeUdfArg>,
387            number_rows: usize,
388        ) -> RResult<streamling_plugin::SafeArrowColumn, RString> {
389            let udf = #static_name.get_or_init(#factory_fn);
390            streamling_plugin::invoke_plugin_udf(udf.inner().as_ref(), args, number_rows)
391        }
392
393        fn #descriptor_fn_name() -> Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError> {
394            let udf = #static_name.get_or_init(#factory_fn);
395            streamling_plugin::build_plugin_udf_descriptor(udf.inner().as_ref(), #invoke_fn_name)
396        }
397    };
398
399    PLUGIN_UDF_COMPONENTS.with(|components| {
400        components.borrow_mut().push(component.to_string());
401    });
402    PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
403        fns.borrow_mut().push(descriptor_fn_name.to_string());
404    });
405
406    TokenStream::new()
407}
408
409/// Registers a type implementing `ScalarUDFImpl` as a plugin UDF.
410///
411/// Usage: `register_plugin_udf!(MyUdfType);`
412///
413/// The type must implement `ScalarUDFImpl` and `Default`.
414#[proc_macro]
415pub fn register_plugin_udf(input: TokenStream) -> TokenStream {
416    let udf_type: Ident = parse_macro_input!(input as Ident);
417
418    let static_name = Ident::new(
419        &format!("PLUGIN_UDF_{}", udf_type.to_string().to_uppercase()),
420        udf_type.span(),
421    );
422    let invoke_fn_name = Ident::new(
423        &format!("plugin_udf_invoke_{}", udf_type.to_string().to_lowercase()),
424        udf_type.span(),
425    );
426    let descriptor_fn_name = Ident::new(
427        &format!(
428            "plugin_udf_descriptor_{}",
429            udf_type.to_string().to_lowercase()
430        ),
431        udf_type.span(),
432    );
433
434    let component = quote! {
435        static #static_name: std::sync::OnceLock<#udf_type> = std::sync::OnceLock::new();
436
437        extern "C" fn #invoke_fn_name(
438            args: RVec<streamling_plugin::SafeUdfArg>,
439            number_rows: usize,
440        ) -> RResult<streamling_plugin::SafeArrowColumn, RString> {
441            let instance = #static_name.get_or_init(|| #udf_type::new());
442            streamling_plugin::invoke_plugin_udf(instance, args, number_rows)
443        }
444
445        fn #descriptor_fn_name() -> Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError> {
446            let instance = #static_name.get_or_init(|| #udf_type::new());
447            streamling_plugin::build_plugin_udf_descriptor(instance, #invoke_fn_name)
448        }
449    };
450
451    PLUGIN_UDF_COMPONENTS.with(|components| {
452        components.borrow_mut().push(component.to_string());
453    });
454    PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
455        fns.borrow_mut().push(descriptor_fn_name.to_string());
456    });
457
458    TokenStream::new()
459}
460
461/// Registers a type implementing `SideOutputPlugin` as a plugin side output.
462///
463/// Usage: `register_plugin_side_output!(MySideOutputType);`
464/// Or with explicit id: `register_plugin_side_output!("my_id", MySideOutputType);`
465///
466/// The type must implement `SideOutputPlugin`.
467#[proc_macro]
468pub fn register_plugin_side_output(input: TokenStream) -> TokenStream {
469    // Parse: either just an Ident, or a LitStr "," Ident
470    let input2: proc_macro2::TokenStream = input.into();
471    let mut iter = input2.into_iter().peekable();
472
473    // Check if first token is a string literal (explicit id)
474    let (side_output_id, side_output_type) = match iter.peek() {
475        Some(proc_macro2::TokenTree::Literal(_)) => {
476            let lit = match iter.next().unwrap() {
477                proc_macro2::TokenTree::Literal(lit) => {
478                    let s = lit.to_string();
479                    if s.starts_with('"') && s.ends_with('"') {
480                        s[1..s.len() - 1].to_string()
481                    } else {
482                        return syn::Error::new_spanned(lit, "Expected string literal for id")
483                            .to_compile_error()
484                            .into();
485                    }
486                }
487                _ => unreachable!(),
488            };
489            // Skip comma
490            iter.next();
491            let type_ident: proc_macro2::TokenStream = iter.collect();
492            let type_ident: Ident =
493                syn::parse2(type_ident).expect("Expected type identifier after id");
494            (lit, type_ident)
495        }
496        _ => {
497            let type_tokens: proc_macro2::TokenStream = iter.collect();
498            let type_ident: Ident = syn::parse2(type_tokens).expect("Expected type identifier");
499            let id = type_ident.to_string().to_lowercase();
500            (id, type_ident)
501        }
502    };
503
504    let static_name = Ident::new(
505        &format!(
506            "PLUGIN_SIDE_OUTPUT_{}",
507            side_output_type.to_string().to_uppercase()
508        ),
509        side_output_type.span(),
510    );
511    let init_fn_name = Ident::new(
512        &format!(
513            "plugin_side_output_initialize_{}",
514            side_output_type.to_string().to_lowercase()
515        ),
516        side_output_type.span(),
517    );
518    let process_fn_name = Ident::new(
519        &format!(
520            "plugin_side_output_process_batch_{}",
521            side_output_type.to_string().to_lowercase()
522        ),
523        side_output_type.span(),
524    );
525    let shutdown_fn_name = Ident::new(
526        &format!(
527            "plugin_side_output_shutdown_{}",
528            side_output_type.to_string().to_lowercase()
529        ),
530        side_output_type.span(),
531    );
532    let descriptor_fn_name = Ident::new(
533        &format!(
534            "plugin_side_output_descriptor_{}",
535            side_output_type.to_string().to_lowercase()
536        ),
537        side_output_type.span(),
538    );
539
540    let component = quote! {
541        static #static_name: std::sync::LazyLock<
542            std::sync::RwLock<std::collections::HashMap<String, #side_output_type>>
543        > = std::sync::LazyLock::new(|| std::sync::RwLock::new(std::collections::HashMap::new()));
544
545        extern "C" fn #init_fn_name(
546            source_name: RString,
547            schema: streamling_plugin::SafeArrowSchema,
548            options: streamling_plugin::PluginOptions,
549            metrics_recorder: streamling_plugin::ffi::PluginMetricsRecorder,
550        ) -> RResult<(), RString> {
551            let schema_ref: arrow::datatypes::SchemaRef = schema.into();
552            let instance = #side_output_type::new(
553                source_name.as_str(),
554                schema_ref,
555                options.as_rust(),
556                metrics_recorder,
557            );
558            let mut map = #static_name.write().expect("side output instance map write lock");
559            map.insert(source_name.as_str().to_string(), instance);
560            RResult::ROk(())
561        }
562
563        extern "C" fn #process_fn_name(
564            source_name: RString,
565            data: streamling_plugin::ffi::SafeArrowArray,
566        ) -> RResult<(), RString> {
567            let map = #static_name.read().expect("side output instance map read lock");
568            let instance = match map.get(source_name.as_str()) {
569                Some(i) => i,
570                None => return RResult::RErr(RString::from("Side output not initialized for source")),
571            };
572            let batch: arrow::array::RecordBatch = data.into();
573            match instance.process_batch(&batch) {
574                Ok(()) => RResult::ROk(()),
575                Err(msg) => RResult::RErr(RString::from(msg)),
576            }
577        }
578
579        extern "C" fn #shutdown_fn_name() -> RResult<(), RString> {
580            let mut map = #static_name.write().expect("side output instance map write lock");
581            for instance in map.values() {
582                instance.shutdown();
583            }
584            // Clear the map to drop all instances, which drops their PluginMetricsRecorders
585            // and disconnects the metrics channels so process_plugin_metrics tasks can exit.
586            map.clear();
587            RResult::ROk(())
588        }
589
590        fn #descriptor_fn_name() -> streamling_plugin::PluginSideOutputDescriptor {
591            streamling_plugin::PluginSideOutputDescriptor {
592                id: RString::from(#side_output_id),
593                initialize: #init_fn_name,
594                process_batch: #process_fn_name,
595                shutdown: #shutdown_fn_name,
596            }
597        }
598    };
599
600    PLUGIN_SIDE_OUTPUT_COMPONENTS.with(|components| {
601        components.borrow_mut().push(component.to_string());
602    });
603    PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS.with(|fns| {
604        fns.borrow_mut().push(descriptor_fn_name.to_string());
605    });
606
607    TokenStream::new()
608}
609
610#[proc_macro]
611pub fn init_plugin(_input: TokenStream) -> TokenStream {
612    generate_init_plugin_code(false)
613}
614
615#[proc_macro]
616pub fn init_plugin_with_async_runtime(_input: TokenStream) -> TokenStream {
617    generate_init_plugin_code(true)
618}
619
620fn generate_init_plugin_code(use_direct_tokio: bool) -> TokenStream {
621    let components = PLUGIN_COMPONENTS.with(|components| {
622        let borrowed = components.borrow();
623        let mut combined = proc_macro2::TokenStream::new();
624
625        for component_str in borrowed.iter() {
626            if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
627                combined.extend(component_tokens);
628            }
629        }
630
631        combined
632    });
633
634    let preprocessor_components = PLUGIN_PREPROCESSOR_COMPONENTS.with(|components| {
635        let borrowed = components.borrow();
636        let mut combined = proc_macro2::TokenStream::new();
637
638        for component_str in borrowed.iter() {
639            if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
640                combined.extend(component_tokens);
641            }
642        }
643
644        combined
645    });
646
647    let udf_components = PLUGIN_UDF_COMPONENTS.with(|components| {
648        let borrowed = components.borrow();
649        let mut combined = proc_macro2::TokenStream::new();
650
651        for component_str in borrowed.iter() {
652            if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
653                combined.extend(component_tokens);
654            }
655        }
656
657        combined
658    });
659
660    let side_output_components = PLUGIN_SIDE_OUTPUT_COMPONENTS.with(|components| {
661        let borrowed = components.borrow();
662        let mut combined = proc_macro2::TokenStream::new();
663
664        for component_str in borrowed.iter() {
665            if let Ok(component_tokens) = component_str.parse::<proc_macro2::TokenStream>() {
666                combined.extend(component_tokens);
667            }
668        }
669
670        combined
671    });
672
673    let udf_descriptor_calls: Vec<proc_macro2::TokenStream> =
674        PLUGIN_UDF_DESCRIPTOR_FNS.with(|fns| {
675            fns.borrow()
676                .iter()
677                .map(|fn_name| {
678                    let ident: proc_macro2::TokenStream = fn_name.parse().unwrap();
679                    quote! { #ident() }
680                })
681                .collect()
682        });
683
684    let side_output_descriptor_calls: Vec<proc_macro2::TokenStream> =
685        PLUGIN_SIDE_OUTPUT_DESCRIPTOR_FNS.with(|fns| {
686            fns.borrow()
687                .iter()
688                .map(|fn_name| {
689                    let ident: proc_macro2::TokenStream = fn_name.parse().unwrap();
690                    quote! { #ident() }
691                })
692                .collect()
693        });
694
695    let has_udfs = !udf_descriptor_calls.is_empty();
696    let has_side_outputs = !side_output_descriptor_calls.is_empty();
697
698    let plugin_ids = PLUGIN_IDS.with(|ids| ids.borrow().clone());
699    let preprocessor_ids = PLUGIN_PREPROCESSOR_IDS.with(|ids| ids.borrow().clone());
700    let all_plugin_ids: Vec<String> = plugin_ids
701        .iter()
702        .chain(preprocessor_ids.iter())
703        .cloned()
704        .collect();
705
706    let plugin_caps = PLUGIN_CAPS.with(|caps| caps.borrow().clone());
707    let caps_inits: Vec<proc_macro2::TokenStream> = plugin_caps
708        .iter()
709        .map(|(id, (a, b, c))| {
710            quote! {
711                default_channel_caps.insert(RString::from(#id), PluginChannelCaps { input: #a, output: #b, metrics: #c });
712            }
713        })
714        .collect();
715
716    let async_imports = if use_direct_tokio {
717        quote! { use streamling_plugin::r#async::{PluginAsyncRuntimeObj, DirectTokioProxy}; }
718    } else {
719        quote! { use streamling_plugin::r#async::PluginAsyncRuntimeObj; }
720    };
721
722    let (runtime_param, runtime_setup) = if use_direct_tokio {
723        (
724            quote! { _runtime: PluginAsyncRuntimeObj, },
725            quote! { let runtime = DirectTokioProxy::new().into_async_runtime_obj(); },
726        )
727    } else {
728        (quote! { runtime: PluginAsyncRuntimeObj, }, quote! {})
729    };
730
731    let udf_descriptors_fn = if has_udfs {
732        quote! {
733            extern "C" fn udf_descriptors() -> RResult<RVec<streamling_plugin::PluginUdfDescriptor>, PluginInitializationError> {
734                let results: Vec<Result<streamling_plugin::PluginUdfDescriptor, PluginInitializationError>> = vec![
735                    #(#udf_descriptor_calls),*
736                ];
737                let mut descriptors = Vec::with_capacity(results.len());
738                for result in results {
739                    match result {
740                        Ok(descriptor) => descriptors.push(descriptor),
741                        Err(e) => return RResult::RErr(e),
742                    }
743                }
744                RResult::ROk(descriptors.into())
745            }
746        }
747    } else {
748        quote! {
749            extern "C" fn udf_descriptors() -> RResult<RVec<streamling_plugin::PluginUdfDescriptor>, PluginInitializationError> {
750                RResult::ROk(RVec::new())
751            }
752        }
753    };
754
755    let side_output_descriptors_fn = if has_side_outputs {
756        quote! {
757            extern "C" fn side_output_descriptors() -> RResult<RVec<streamling_plugin::PluginSideOutputDescriptor>, PluginInitializationError> {
758                let descriptors: Vec<streamling_plugin::PluginSideOutputDescriptor> = vec![
759                    #(#side_output_descriptor_calls),*
760                ];
761                RResult::ROk(descriptors.into())
762            }
763        }
764    } else {
765        quote! {
766            extern "C" fn side_output_descriptors() -> RResult<RVec<streamling_plugin::PluginSideOutputDescriptor>, PluginInitializationError> {
767                RResult::ROk(RVec::new())
768            }
769        }
770    };
771
772    let output = quote! {
773        use abi_stable::export_root_module;
774        use abi_stable::prefix_type::PrefixTypeTrait;
775        use abi_stable::std_types::{ROption, RResult, RString, RVec, RHashMap};
776        use abi_stable::traits::IntoReprC;
777        use streamling_plugin::ffi::SafeArrowSchema;
778        #async_imports
779        use streamling_plugin::{PluginStateBackendConfig, PluginChannels, PluginInitializationError, PluginLogging, PluginModule, PluginModuleRef, PluginOptions, PluginResult, PluginRuntimeConfiguration, PluginChannelCaps, SideOutputPlugin, sink_generator, source_generator, transform_generator, preprocessor_generator};
780
781        #udf_components
782        #side_output_components
783
784        extern "C" fn init(
785            logging: PluginLogging,
786        ) -> RResult<PluginRuntimeConfiguration, PluginInitializationError> {
787            logging.initialize_logging();
788
789            let plugin_ids: RVec<RString> = vec![
790                #(#all_plugin_ids.to_string().into_c()),*
791            ].into();
792
793            let mut default_channel_caps: RHashMap<RString, PluginChannelCaps> = RHashMap::new();
794            #(#caps_inits)*
795
796            Ok(PluginRuntimeConfiguration {
797                plugin_ids,
798                default_channel_caps,
799            }).into_c()
800        }
801
802        extern "C" fn create(
803            plugin_id: RString,
804            input_schema: ROption<SafeArrowSchema>,
805            options: PluginOptions,
806            #runtime_param
807            state_backend_config: PluginStateBackendConfig,
808            message_channels: PluginChannels,
809        ) -> RResult<PluginResult, PluginInitializationError> {
810            #runtime_setup
811
812            match plugin_id.as_str() {
813                #components
814                #preprocessor_components
815                _ => Err(PluginInitializationError::NotImplemented).into_c(),
816            }
817        }
818
819        #udf_descriptors_fn
820        #side_output_descriptors_fn
821
822        #[export_root_module]
823        pub fn get_module() -> PluginModuleRef {
824            PluginModule { init, create, udf_descriptors, side_output_descriptors }.leak_into_prefix()
825        }
826    };
827
828    output.into()
829}