Skip to main content

rivven_connect_derive/
lib.rs

1//! Derive macros for Rivven connector development
2//!
3//! This crate provides procedural macros that reduce boilerplate when implementing
4//! Rivven connectors.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use rivven_connect_derive::{SourceConfig, SinkConfig, connector_spec};
10//!
11//! #[derive(Debug, Deserialize, Validate, JsonSchema, SourceConfig)]
12//! #[source(
13//!     name = "my-source",
14//!     version = "1.0.0",
15//!     description = "Custom data source",
16//!     author = "Rivven Team",
17//!     license = "Apache-2.0",
18//!     documentation_url = "https://rivven.dev/docs/connectors/my-source"
19//! )]
20//! pub struct MySourceConfig {
21//!     #[validate(url)]
22//!     pub endpoint: String,
23//!     #[validate(range(min = 1, max = 100))]
24//!     pub batch_size: usize,
25//! }
26//!
27//! #[derive(Debug, Deserialize, Validate, JsonSchema, SinkConfig)]
28//! #[sink(
29//!     name = "my-sink",
30//!     version = "1.0.0",
31//!     batching,
32//!     batch_size = 1000
33//! )]
34//! pub struct MySinkConfig {
35//!     pub bucket: String,
36//! }
37//! ```
38
39use darling::{FromDeriveInput, FromMeta};
40use proc_macro::TokenStream;
41use quote::quote;
42use syn::{parse_macro_input, DeriveInput};
43
44/// Attributes for the Source derive macro
45#[derive(Debug, FromDeriveInput)]
46#[darling(attributes(source), supports(struct_named))]
47struct SourceAttrs {
48    ident: syn::Ident,
49    /// Connector name (e.g., "postgres-cdc")
50    #[darling(default)]
51    #[allow(dead_code)]
52    name: Option<String>,
53    /// Connector version (e.g., "1.0.0")
54    #[darling(default)]
55    #[allow(dead_code)]
56    version: Option<String>,
57    /// Description of the connector
58    #[darling(default)]
59    #[allow(dead_code)]
60    description: Option<String>,
61    /// Author or maintainer
62    #[darling(default)]
63    #[allow(dead_code)]
64    author: Option<String>,
65    /// License identifier (e.g., "Apache-2.0")
66    #[darling(default)]
67    #[allow(dead_code)]
68    license: Option<String>,
69    /// Documentation URL
70    #[darling(default)]
71    #[allow(dead_code)]
72    documentation_url: Option<String>,
73    /// Whether the source supports incremental sync
74    #[darling(default)]
75    #[allow(dead_code)]
76    incremental: bool,
77    /// Supported source types (e.g., full_refresh, incremental)
78    #[darling(default)]
79    #[allow(dead_code)]
80    source_types: Option<SourceTypesAttr>,
81}
82
83#[derive(Debug, Default, FromMeta)]
84#[allow(dead_code)]
85struct SourceTypesAttr {
86    full_refresh: bool,
87    incremental: bool,
88}
89
90/// Attributes for the Sink derive macro
91#[derive(Debug, FromDeriveInput)]
92#[darling(attributes(sink), supports(struct_named))]
93struct SinkAttrs {
94    ident: syn::Ident,
95    /// Connector name (e.g., "s3-sink")
96    #[darling(default)]
97    #[allow(dead_code)]
98    name: Option<String>,
99    /// Connector version (e.g., "1.0.0")
100    #[darling(default)]
101    #[allow(dead_code)]
102    version: Option<String>,
103    /// Description of the connector
104    #[darling(default)]
105    #[allow(dead_code)]
106    description: Option<String>,
107    /// Author or maintainer
108    #[darling(default)]
109    #[allow(dead_code)]
110    author: Option<String>,
111    /// License identifier (e.g., "Apache-2.0")
112    #[darling(default)]
113    #[allow(dead_code)]
114    license: Option<String>,
115    /// Documentation URL
116    #[darling(default)]
117    #[allow(dead_code)]
118    documentation_url: Option<String>,
119    /// Whether the sink supports batching
120    #[darling(default)]
121    #[allow(dead_code)]
122    batching: bool,
123    /// Default batch size
124    #[darling(default)]
125    #[allow(dead_code)]
126    batch_size: Option<usize>,
127}
128
129/// Attributes for the Transform derive macro
130#[derive(Debug, FromDeriveInput)]
131#[darling(attributes(transform), supports(struct_named))]
132struct TransformAttrs {
133    ident: syn::Ident,
134    /// Transform name (e.g., "filter-transform")
135    #[darling(default)]
136    #[allow(dead_code)]
137    name: Option<String>,
138    /// Transform version (e.g., "1.0.0")
139    #[darling(default)]
140    #[allow(dead_code)]
141    version: Option<String>,
142    /// Description of the transform
143    #[darling(default)]
144    #[allow(dead_code)]
145    description: Option<String>,
146}
147
148/// Derive macro for implementing SourceConfig boilerplate
149///
150/// This macro generates a `*Spec` struct with `spec()`, `name()`, and `version()` methods.
151/// The generated spec includes config schema derived from the struct's JsonSchema implementation.
152///
153/// # Attributes
154///
155/// - `#[source(name = "...")]` - Connector name (defaults to struct name without "Config")
156/// - `#[source(version = "...")]` - Connector version (default: env!("CARGO_PKG_VERSION") or "0.0.1")
157/// - `#[source(description = "...")]` - Connector description
158/// - `#[source(author = "...")]` - Author or maintainer
159/// - `#[source(license = "...")]` - License identifier (e.g., "Apache-2.0")
160/// - `#[source(documentation_url = "...")]` - Documentation URL
161/// - `#[source(incremental)]` - Enable incremental sync support
162///
163/// # Example
164///
165/// ```rust,ignore
166/// #[derive(Debug, Deserialize, Validate, JsonSchema, SourceConfig)]
167/// #[source(
168///     name = "postgres-cdc",
169///     version = "1.0.0",
170///     description = "PostgreSQL CDC connector",
171///     author = "Rivven Team",
172///     license = "Apache-2.0",
173///     incremental
174/// )]
175/// pub struct PostgresCdcConfig {
176///     pub connection_string: String,
177///     pub slot_name: String,
178/// }
179/// ```
180#[proc_macro_derive(SourceConfig, attributes(source))]
181pub fn derive_source_config(input: TokenStream) -> TokenStream {
182    let input = parse_macro_input!(input as DeriveInput);
183
184    let attrs = match SourceAttrs::from_derive_input(&input) {
185        Ok(v) => v,
186        Err(e) => return TokenStream::from(e.write_errors()),
187    };
188
189    let struct_name = &attrs.ident;
190    let spec_struct_name = quote::format_ident!("{}Spec", struct_name);
191
192    let name = attrs.name.unwrap_or_else(|| {
193        let name = struct_name.to_string();
194        name.strip_suffix("Config").unwrap_or(&name).to_lowercase()
195    });
196    // Use the calling crate's CARGO_PKG_VERSION when no version
197    // is explicitly specified, falling back to "0.0.1" only if that env var
198    // is unavailable.
199    let version_code = match &attrs.version {
200        Some(v) => quote! { #v },
201        None => quote! { match option_env!("CARGO_PKG_VERSION") {
202            Some(v) => v,
203            None => "0.0.1",
204        } },
205    };
206
207    let description_code = match &attrs.description {
208        Some(desc) => quote! { .description(#desc) },
209        None => quote! {},
210    };
211
212    let author_code = match &attrs.author {
213        Some(author) => quote! { .author(#author) },
214        None => quote! {},
215    };
216
217    let license_code = match &attrs.license {
218        Some(license) => quote! { .license(#license) },
219        None => quote! {},
220    };
221
222    let doc_url_code = match &attrs.documentation_url {
223        Some(url) => quote! { .documentation_url(#url) },
224        None => quote! {},
225    };
226
227    let incremental_code = if attrs.incremental {
228        quote! { .incremental(true) }
229    } else {
230        quote! {}
231    };
232
233    let expanded = quote! {
234        /// Auto-generated spec holder for this source configuration
235        pub struct #spec_struct_name;
236
237        impl #spec_struct_name {
238            /// Returns the connector specification with config schema
239            pub fn spec() -> rivven_connect::ConnectorSpec {
240                rivven_connect::ConnectorSpec::builder(#name, #version_code)
241                    #description_code
242                    #author_code
243                    #license_code
244                    #doc_url_code
245                    #incremental_code
246                    .config_schema::<#struct_name>()
247                    .build()
248            }
249
250            /// Returns the connector name
251            pub const fn name() -> &'static str {
252                #name
253            }
254
255            /// Returns the connector version
256            pub fn version() -> &'static str {
257                #version_code
258            }
259        }
260    };
261
262    TokenStream::from(expanded)
263}
264
265/// Derive macro for implementing SinkConfig boilerplate
266///
267/// This macro generates a `*Spec` struct with `spec()`, `name()`, `version()`, and optionally
268/// `batch_config()` methods. The generated spec includes config schema derived from the struct's
269/// JsonSchema implementation.
270///
271/// # Attributes
272///
273/// - `#[sink(name = "...")]` - Connector name (defaults to struct name without "Config")
274/// - `#[sink(version = "...")]` - Connector version (default: env!("CARGO_PKG_VERSION") or "0.0.1")
275/// - `#[sink(description = "...")]` - Connector description
276/// - `#[sink(author = "...")]` - Author or maintainer
277/// - `#[sink(license = "...")]` - License identifier (e.g., "Apache-2.0")
278/// - `#[sink(documentation_url = "...")]` - Documentation URL
279/// - `#[sink(batching)]` - Enable batching support (generates batch_config() method)
280/// - `#[sink(batch_size = N)]` - Default batch size (default: 10000)
281///
282/// # Example
283///
284/// ```rust,ignore
285/// #[derive(Debug, Deserialize, Validate, JsonSchema, SinkConfig)]
286/// #[sink(
287///     name = "s3-sink",
288///     version = "1.0.0",
289///     description = "Amazon S3 storage sink",
290///     author = "Rivven Team",
291///     license = "Apache-2.0",
292///     batching,
293///     batch_size = 1000
294/// )]
295/// pub struct S3SinkConfig {
296///     pub bucket: String,
297///     pub prefix: Option<String>,
298/// }
299/// ```
300#[proc_macro_derive(SinkConfig, attributes(sink))]
301pub fn derive_sink_config(input: TokenStream) -> TokenStream {
302    let input = parse_macro_input!(input as DeriveInput);
303
304    let attrs = match SinkAttrs::from_derive_input(&input) {
305        Ok(v) => v,
306        Err(e) => return TokenStream::from(e.write_errors()),
307    };
308
309    let struct_name = &attrs.ident;
310    let spec_struct_name = quote::format_ident!("{}Spec", struct_name);
311
312    let name = attrs.name.unwrap_or_else(|| {
313        let name = struct_name.to_string();
314        name.strip_suffix("Config").unwrap_or(&name).to_lowercase()
315    });
316    let version_code = match &attrs.version {
317        Some(v) => quote! { #v },
318        None => quote! { match option_env!("CARGO_PKG_VERSION") {
319            Some(v) => v,
320            None => "0.0.1",
321        } },
322    };
323
324    let description_code = match &attrs.description {
325        Some(desc) => quote! { .description(#desc) },
326        None => quote! {},
327    };
328
329    let author_code = match &attrs.author {
330        Some(author) => quote! { .author(#author) },
331        None => quote! {},
332    };
333
334    let license_code = match &attrs.license {
335        Some(license) => quote! { .license(#license) },
336        None => quote! {},
337    };
338
339    let doc_url_code = match &attrs.documentation_url {
340        Some(url) => quote! { .documentation_url(#url) },
341        None => quote! {},
342    };
343
344    let batch_config_code = if attrs.batching {
345        let batch_size = attrs.batch_size.unwrap_or(10_000);
346        quote! {
347            /// Returns the default batch configuration
348            pub fn batch_config() -> rivven_connect::BatchConfig {
349                rivven_connect::BatchConfig {
350                    max_records: #batch_size,
351                    ..Default::default()
352                }
353            }
354        }
355    } else {
356        quote! {}
357    };
358
359    let expanded = quote! {
360        /// Auto-generated spec holder for this sink configuration
361        pub struct #spec_struct_name;
362
363        impl #spec_struct_name {
364            /// Returns the connector specification with config schema
365            pub fn spec() -> rivven_connect::ConnectorSpec {
366                rivven_connect::ConnectorSpec::builder(#name, #version_code)
367                    #description_code
368                    #author_code
369                    #license_code
370                    #doc_url_code
371                    .config_schema::<#struct_name>()
372                    .build()
373            }
374
375            /// Returns the connector name
376            pub const fn name() -> &'static str {
377                #name
378            }
379
380            /// Returns the connector version
381            pub fn version() -> &'static str {
382                #version_code
383            }
384
385            #batch_config_code
386        }
387    };
388
389    TokenStream::from(expanded)
390}
391
392/// Derive macro for implementing TransformConfig boilerplate
393///
394/// # Attributes
395///
396/// - `#[transform(name = "...")]` - Transform name (required)
397/// - `#[transform(version = "...")]` - Transform version (default: env!("CARGO_PKG_VERSION") or "0.0.1")
398/// - `#[transform(description = "...")]` - Transform description
399///
400/// # Example
401///
402/// ```rust,ignore
403/// #[derive(Debug, Deserialize, Validate, JsonSchema, TransformConfig)]
404/// #[transform(name = "json-filter", version = "1.0.0")]
405/// pub struct JsonFilterConfig {
406///     pub field: String,
407///     pub pattern: String,
408/// }
409/// ```
410#[proc_macro_derive(TransformConfig, attributes(transform))]
411pub fn derive_transform_config(input: TokenStream) -> TokenStream {
412    let input = parse_macro_input!(input as DeriveInput);
413
414    let attrs = match TransformAttrs::from_derive_input(&input) {
415        Ok(v) => v,
416        Err(e) => return TokenStream::from(e.write_errors()),
417    };
418
419    let struct_name = &attrs.ident;
420    let spec_struct_name = quote::format_ident!("{}Spec", struct_name);
421
422    let name = attrs.name.unwrap_or_else(|| {
423        let name = struct_name.to_string();
424        name.strip_suffix("Config").unwrap_or(&name).to_lowercase()
425    });
426    let version_code = match &attrs.version {
427        Some(v) => quote! { #v },
428        None => quote! { match option_env!("CARGO_PKG_VERSION") {
429            Some(v) => v,
430            None => "0.0.1",
431        } },
432    };
433
434    // Use `.description()` consistent with SourceConfig/SinkConfig
435    let description_code = match attrs.description {
436        Some(desc) => quote! { .description(#desc) },
437        None => quote! {},
438    };
439
440    let expanded = quote! {
441        /// Auto-generated spec holder for this transform configuration
442        pub struct #spec_struct_name;
443
444        impl #spec_struct_name {
445            /// Returns the connector specification with config schema
446            pub fn spec() -> rivven_connect::ConnectorSpec {
447                rivven_connect::ConnectorSpec::builder(#name, #version_code)
448                    #description_code
449                    .config_schema::<#struct_name>()
450                    .build()
451            }
452
453            /// Returns the transform name
454            pub const fn name() -> &'static str {
455                #name
456            }
457
458            /// Returns the transform version
459            pub fn version() -> &'static str {
460                #version_code
461            }
462        }
463    };
464
465    TokenStream::from(expanded)
466}
467
468/// Attributes for connector_spec macro
469#[derive(Debug, Default, FromMeta)]
470struct ConnectorSpecAttrs {
471    /// Connector name (required — compile error if missing)
472    #[darling(default)]
473    name: Option<String>,
474    /// Connector version
475    #[darling(default)]
476    #[allow(dead_code)]
477    version: Option<String>,
478    /// Description of the connector
479    #[darling(default)]
480    #[allow(dead_code)]
481    description: Option<String>,
482    /// Documentation URL
483    #[darling(default)]
484    #[allow(dead_code)]
485    documentation_url: Option<String>,
486}
487
488/// Attribute macro for defining connector specifications inline
489///
490/// This macro can be applied to a module or struct to generate
491/// a `ConnectorSpec` with the given attributes.
492///
493/// # Example
494///
495/// ```rust,ignore
496/// #[connector_spec(
497///     name = "my-connector",
498///     version = "1.0.0",
499///     description = "A custom connector",
500///     documentation_url = "https://docs.example.com"
501/// )]
502/// pub mod my_connector {
503///     // Connector implementation
504/// }
505/// ```
506#[proc_macro_attribute]
507pub fn connector_spec(attr: TokenStream, item: TokenStream) -> TokenStream {
508    // Parse attributes using darling
509    let attr_args = match darling::ast::NestedMeta::parse_meta_list(attr.into()) {
510        Ok(v) => v,
511        Err(e) => return TokenStream::from(darling::Error::from(e).write_errors()),
512    };
513
514    let attrs = match ConnectorSpecAttrs::from_list(&attr_args) {
515        Ok(v) => v,
516        Err(e) => return TokenStream::from(e.write_errors()),
517    };
518
519    // Emit a compile error when `name` is not provided,
520    // instead of silently defaulting to "unknown".
521    let name = match attrs.name {
522        Some(n) => n,
523        None => {
524            return TokenStream::from(
525                syn::Error::new(
526                    proc_macro2::Span::call_site(),
527                    "connector_spec requires `name = \"...\"` attribute",
528                )
529                .to_compile_error(),
530            );
531        }
532    };
533    let version_code = match &attrs.version {
534        Some(v) => quote! { #v },
535        None => quote! { match option_env!("CARGO_PKG_VERSION") {
536            Some(v) => v,
537            None => "0.0.1",
538        } },
539    };
540
541    // Use `.description()` and `.documentation_url()` consistent with builder API
542    let description_code = match attrs.description {
543        Some(desc) => quote! { .description(#desc) },
544        None => quote! {},
545    };
546
547    let doc_url_code = match attrs.documentation_url {
548        Some(url) => quote! { .documentation_url(#url) },
549        None => quote! {},
550    };
551
552    let item: proc_macro2::TokenStream = item.into();
553
554    let expanded = quote! {
555        #item
556
557        /// Auto-generated connector specification
558        pub fn connector_spec() -> rivven_connect::ConnectorSpec {
559            rivven_connect::ConnectorSpec::builder(#name, #version_code)
560                #description_code
561                #doc_url_code
562                .build()
563        }
564    };
565
566    TokenStream::from(expanded)
567}
568
569#[cfg(test)]
570mod tests {
571    #[test]
572    fn test_derives_compile() {
573        // These tests just verify the macros compile
574        // Actual behavior is tested in integration tests
575    }
576}