rigatoni_core/
destination.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Destination Trait and Error Types
18//!
19//! This module defines the core [`Destination`] trait that all destination implementations
20//! must satisfy. Destinations are the final stage in the data replication pipeline where processed
21//! events are written to external systems like S3, `BigQuery`, Kafka, or databases.
22//!
23//! # Architecture
24//!
25//! The [`Destination`] trait provides a uniform interface for writing batches of events
26//! and managing destination state. It's designed to be:
27//!
28//! - **Async-first**: All operations are asynchronous for high throughput
29//! - **Batch-oriented**: Events are written in batches for efficiency
30//! - **Thread-safe**: Implementations must be `Send + Sync` for concurrent access
31//! - **Error-resilient**: Comprehensive error handling with retry capabilities
32//! - **Extensible**: Optional methods allow future additions without breaking changes
33//!
34//! # Design Rationale
35//!
36//! ## Why Batching?
37//!
38//! Most external systems perform better with batch operations rather than individual writes.
39//! The [`Destination::write_batch`] method allows implementations to:
40//! - Amortize network overhead across multiple events
41//! - Use bulk APIs for better throughput
42//! - Implement efficient memory management
43//!
44//! ## Why Separate `flush()`?
45//!
46//! The [`Destination::flush`] method is separate from [`Destination::write_batch`] because:
47//! - **Buffering control**: Destinations may buffer writes internally for efficiency
48//! - **Durability guarantees**: `flush()` provides a synchronization point for durability
49//! - **Transaction boundaries**: Allows grouping multiple batches into transactions
50//! - **Graceful shutdown**: Ensures all buffered data is written before termination
51//!
52//! ## Thread Safety
53//!
54//! All implementations must be `Send + Sync + 'static` to support:
55//! - Concurrent writes from multiple producers
56//! - Safe sharing across async task boundaries
57//! - Use in thread pools and work stealing schedulers
58//!
59//! # Examples
60//!
61//! ## Implementing a Custom Destination
62//!
63//! ```rust
64//! use rigatoni_core::destination::{Destination, DestinationError};
65//! use rigatoni_core::event::ChangeEvent;
66//! use async_trait::async_trait;
67//!
68//! /// A simple file-based destination
69//! pub struct FileDestination {
70//!     path: String,
71//!     buffer: Vec<String>,
72//! }
73//!
74//! #[async_trait]
75//! impl Destination for FileDestination {
76//!     async fn write_batch(&mut self, events: &[ChangeEvent]) -> Result<(), DestinationError> {
77//!         // Buffer events as JSON lines
78//!         for event in events {
79//!             let json = serde_json::to_string(event)
80//!                 .map_err(|e| DestinationError::serialization(e, "Failed to serialize event"))?;
81//!             self.buffer.push(json);
82//!         }
83//!         Ok(())
84//!     }
85//!
86//!     async fn flush(&mut self) -> Result<(), DestinationError> {
87//!         // Write all buffered events to file
88//!         if !self.buffer.is_empty() {
89//!             // Implementation would write to file here
90//!             self.buffer.clear();
91//!         }
92//!         Ok(())
93//!     }
94//!
95//!     fn supports_transactions(&self) -> bool {
96//!         false // File writes don't support transactions
97//!     }
98//!
99//!     async fn close(&mut self) -> Result<(), DestinationError> {
100//!         self.flush().await
101//!     }
102//! }
103//! ```
104//!
105//! ## Using a Destination
106//!
107//! ```rust,no_run
108//! # use rigatoni_core::destination::{Destination, DestinationError};
109//! # use rigatoni_core::event::ChangeEvent;
110//! # async fn example(mut dest: impl Destination, events: Vec<ChangeEvent>) -> Result<(), DestinationError> {
111//! // Write a batch of events
112//! dest.write_batch(&events).await?;
113//!
114//! // Flush to ensure durability
115//! dest.flush().await?;
116//!
117//! // Clean shutdown
118//! dest.close().await?;
119//! # Ok(())
120//! # }
121//! ```
122//!
123//! # Error Handling
124//!
125//! The [`DestinationError`] type provides detailed error classification:
126//! - [`DestinationError::ConnectionError`]: Network/connection failures (retryable)
127//! - [`DestinationError::SerializationError`]: Data serialization failures (non-retryable)
128//! - [`DestinationError::WriteError`]: Write operation failures (may be retryable)
129//! - [`DestinationError::ConfigurationError`]: Invalid configuration (non-retryable)
130//! - [`DestinationError::CapacityError`]: Resource limits exceeded (backpressure signal)
131//!
132//! Each error variant includes context about retryability to help callers implement
133//! appropriate retry strategies.
134//!
135//! # Performance Considerations
136//!
137//! ## Trait Objects vs Generics
138//!
139//! The trait can be used in two ways:
140//!
141//! ```rust
142//! # use rigatoni_core::destination::Destination;
143//! // Static dispatch (zero-cost abstraction, monomorphization)
144//! async fn process_with_static<D: Destination>(dest: D) {
145//!     // Compiler generates specialized code for each destination type
146//! }
147//!
148//! // Dynamic dispatch (runtime flexibility, single compiled function)
149//! async fn process_with_dynamic(dest: Box<dyn Destination>) {
150//!     // Single compiled function, virtual dispatch overhead
151//! }
152//! ```
153//!
154//! **Trade-offs**:
155//! - Static dispatch: Faster (no vtable), larger binary (code duplication), compile-time type
156//! - Dynamic dispatch: Smaller binary, allows runtime destination selection, slight overhead
157//!
158//! For most data replication workloads, the difference is negligible compared to I/O costs.
159//!
160//! # Future Evolution
161//!
162//! The trait is designed for non-breaking evolution:
163//! - New optional methods can be added with default implementations
164//! - The [`Destination::metadata`] method provides capability discovery
165//! - Version negotiation can be added through metadata
166//!
167//! Planned additions:
168//! - `async fn begin_transaction(&mut self) -> Result<TransactionId, DestinationError>`
169//! - `async fn commit_transaction(&mut self, id: TransactionId) -> Result<(), DestinationError>`
170//! - `async fn health_check(&self) -> Result<HealthStatus, DestinationError>`
171
172use crate::event::ChangeEvent;
173use async_trait::async_trait;
174use std::collections::HashMap;
175use thiserror::Error;
176
177/// Errors that can occur when writing to a destination.
178///
179/// Each error variant includes information about whether the operation is retryable
180/// and provides context for error handling and observability.
181#[derive(Error, Debug)]
182pub enum DestinationError {
183    /// Connection to the destination failed.
184    ///
185    /// This is typically retryable after a backoff period. Examples include:
186    /// - Network timeouts
187    /// - DNS resolution failures
188    /// - Connection refused
189    /// - TLS handshake failures
190    #[error("Connection error: {message}")]
191    ConnectionError {
192        /// Human-readable error message
193        message: String,
194        /// The underlying connection error
195        #[source]
196        source: Option<Box<dyn std::error::Error + Send + Sync>>,
197    },
198
199    /// Failed to serialize events for writing.
200    ///
201    /// This is typically non-retryable as it indicates a data quality issue.
202    /// Examples include:
203    /// - Invalid UTF-8 sequences
204    /// - Schema validation failures
205    /// - Unsupported data types
206    #[error("Serialization error: {message}")]
207    SerializationError {
208        /// Human-readable error message
209        message: String,
210        /// The underlying serialization error
211        #[source]
212        source: Option<Box<dyn std::error::Error + Send + Sync>>,
213    },
214
215    /// Failed to write events to the destination.
216    ///
217    /// Retryability depends on the specific cause. Examples include:
218    /// - Quota exceeded (retryable after backoff)
219    /// - Permission denied (non-retryable)
220    /// - Destination unavailable (retryable)
221    /// - Invalid data format (non-retryable)
222    #[error("Write error: {message}")]
223    WriteError {
224        /// Human-readable error message
225        message: String,
226        /// Whether this specific write error is retryable
227        retryable: bool,
228        /// The underlying error
229        #[source]
230        source: Option<Box<dyn std::error::Error + Send + Sync>>,
231    },
232
233    /// Invalid destination configuration.
234    ///
235    /// This is non-retryable and indicates a programming or configuration error.
236    /// Examples include:
237    /// - Missing required configuration parameters
238    /// - Invalid connection strings
239    /// - Conflicting options
240    #[error("Configuration error: {message}")]
241    ConfigurationError {
242        /// Human-readable error message
243        message: String,
244        /// Configuration parameter name if applicable
245        parameter: Option<String>,
246    },
247
248    /// Destination capacity exceeded.
249    ///
250    /// This signals backpressure and should pause writes. Examples include:
251    /// - Write buffer full
252    /// - Rate limit exceeded
253    /// - Disk space exhausted
254    /// - Memory limit reached
255    #[error("Capacity error: {message}")]
256    CapacityError {
257        /// Human-readable error message
258        message: String,
259        /// Current capacity utilization (0.0 - 1.0)
260        utilization: Option<f64>,
261        /// Suggested wait time before retry
262        retry_after: Option<std::time::Duration>,
263    },
264
265    /// A generic error occurred.
266    ///
267    /// Used for errors that don't fit other categories or for wrapping
268    /// destination-specific errors.
269    #[error("Destination error: {message}")]
270    Other {
271        /// Human-readable error message
272        message: String,
273        /// Whether this error is retryable
274        retryable: bool,
275        /// The underlying error
276        #[source]
277        source: Option<Box<dyn std::error::Error + Send + Sync>>,
278    },
279}
280
281impl DestinationError {
282    /// Creates a connection error from any error type.
283    #[must_use]
284    pub fn connection(source: impl std::error::Error + Send + Sync + 'static) -> Self {
285        Self::ConnectionError {
286            message: source.to_string(),
287            source: Some(Box::new(source)),
288        }
289    }
290
291    /// Creates a connection error with a custom message.
292    #[must_use]
293    pub fn connection_msg(message: impl Into<String>) -> Self {
294        Self::ConnectionError {
295            message: message.into(),
296            source: None,
297        }
298    }
299
300    /// Creates a serialization error from any error type.
301    #[must_use]
302    pub fn serialization(
303        source: impl std::error::Error + Send + Sync + 'static,
304        message: impl Into<String>,
305    ) -> Self {
306        Self::SerializationError {
307            message: message.into(),
308            source: Some(Box::new(source)),
309        }
310    }
311
312    /// Creates a write error with retryability information.
313    #[must_use]
314    pub fn write(source: impl std::error::Error + Send + Sync + 'static, retryable: bool) -> Self {
315        Self::WriteError {
316            message: source.to_string(),
317            retryable,
318            source: Some(Box::new(source)),
319        }
320    }
321
322    /// Creates a write error with a custom message.
323    #[must_use]
324    pub fn write_msg(message: impl Into<String>, retryable: bool) -> Self {
325        Self::WriteError {
326            message: message.into(),
327            retryable,
328            source: None,
329        }
330    }
331
332    /// Creates a configuration error.
333    #[must_use]
334    pub fn configuration(message: impl Into<String>, parameter: Option<String>) -> Self {
335        Self::ConfigurationError {
336            message: message.into(),
337            parameter,
338        }
339    }
340
341    /// Creates a capacity error with optional metadata.
342    ///
343    /// # Panics
344    ///
345    /// Panics in debug builds if `utilization` is provided and not in range [0.0, 1.0].
346    #[must_use]
347    pub fn capacity(
348        message: impl Into<String>,
349        utilization: Option<f64>,
350        retry_after: Option<std::time::Duration>,
351    ) -> Self {
352        // Validate utilization range in debug builds
353        if let Some(u) = utilization {
354            debug_assert!(
355                (0.0..=1.0).contains(&u),
356                "utilization must be in range [0.0, 1.0], got {u}"
357            );
358        }
359
360        Self::CapacityError {
361            message: message.into(),
362            utilization,
363            retry_after,
364        }
365    }
366
367    /// Creates a generic error.
368    #[must_use]
369    pub fn other(source: impl std::error::Error + Send + Sync + 'static, retryable: bool) -> Self {
370        Self::Other {
371            message: source.to_string(),
372            retryable,
373            source: Some(Box::new(source)),
374        }
375    }
376
377    /// Returns whether this error is retryable.
378    ///
379    /// This helps callers implement appropriate retry strategies:
380    /// - Retryable errors should be retried with exponential backoff
381    /// - Non-retryable errors should fail fast or route to dead letter queue
382    #[must_use]
383    pub const fn is_retryable(&self) -> bool {
384        match self {
385            Self::ConnectionError { .. } | Self::CapacityError { .. } => true,
386            Self::SerializationError { .. } | Self::ConfigurationError { .. } => false,
387            Self::WriteError { retryable, .. } | Self::Other { retryable, .. } => *retryable,
388        }
389    }
390
391    /// Returns suggested wait time before retry, if applicable.
392    #[must_use]
393    pub const fn retry_after(&self) -> Option<std::time::Duration> {
394        match self {
395            Self::CapacityError { retry_after, .. } => *retry_after,
396            _ => None,
397        }
398    }
399}
400
401/// Metadata about a destination's capabilities.
402///
403/// This allows runtime introspection of destination features and is used for:
404/// - Feature detection and capability negotiation
405/// - Monitoring and observability
406/// - Dynamic pipeline configuration
407#[derive(Debug, Clone, PartialEq, Eq)]
408pub struct DestinationMetadata {
409    /// Human-readable destination name (e.g., "S3", "`BigQuery`", "Kafka")
410    pub name: String,
411
412    /// Destination type identifier (e.g., "s3", "bigquery", "kafka")
413    pub destination_type: String,
414
415    /// Whether the destination supports transactions
416    pub supports_transactions: bool,
417
418    /// Maximum batch size hint (None = no limit)
419    pub max_batch_size: Option<usize>,
420
421    /// Whether the destination supports concurrent writes
422    pub supports_concurrent_writes: bool,
423
424    /// Additional destination-specific metadata
425    pub properties: HashMap<String, String>,
426}
427
428impl DestinationMetadata {
429    /// Creates new metadata with required fields.
430    #[must_use]
431    pub fn new(name: impl Into<String>, destination_type: impl Into<String>) -> Self {
432        Self {
433            name: name.into(),
434            destination_type: destination_type.into(),
435            supports_transactions: false,
436            max_batch_size: None,
437            supports_concurrent_writes: true,
438            properties: HashMap::new(),
439        }
440    }
441
442    /// Sets transaction support.
443    #[must_use]
444    pub const fn with_transactions(mut self, supports: bool) -> Self {
445        self.supports_transactions = supports;
446        self
447    }
448
449    /// Sets maximum batch size.
450    #[must_use]
451    pub const fn with_max_batch_size(mut self, size: usize) -> Self {
452        self.max_batch_size = Some(size);
453        self
454    }
455
456    /// Sets concurrent write support.
457    #[must_use]
458    pub const fn with_concurrent_writes(mut self, supports: bool) -> Self {
459        self.supports_concurrent_writes = supports;
460        self
461    }
462
463    /// Adds a custom property.
464    #[must_use]
465    pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
466        self.properties.insert(key.into(), value.into());
467        self
468    }
469}
470
471/// The core destination trait for writing events to external systems.
472///
473/// All destination implementations (S3, `BigQuery`, Kafka, databases, etc.) must implement
474/// this trait. It provides a uniform interface for the Rigatoni pipeline to write events
475/// regardless of the underlying destination system.
476///
477/// # Thread Safety
478///
479/// Implementations must be `Send + Sync + 'static` to support:
480/// - Concurrent access from multiple async tasks
481/// - Safe transfer across thread boundaries
482/// - Use in work-stealing schedulers
483///
484/// # Async Considerations
485///
486/// All methods are async to support:
487/// - Non-blocking I/O operations
488/// - Efficient resource utilization
489/// - High concurrency without thread-per-connection
490///
491/// The `#[async_trait]` macro is used because native async trait methods have limitations
492/// in Rust (as of 2024). This macro transforms async trait methods into methods that
493/// return `Pin<Box<dyn Future + Send>>`, enabling dynamic dispatch while maintaining
494/// ergonomic async/await syntax.
495///
496/// # Error Handling
497///
498/// All fallible operations return [`DestinationError`] which provides:
499/// - Detailed error classification
500/// - Retryability information
501/// - Error context for observability
502/// - Source error chaining
503///
504/// Implementations should provide specific error information to help callers make
505/// informed retry decisions.
506///
507/// # Implementation Guide
508///
509/// When implementing this trait:
510///
511/// 1. **`write_batch`**: Should handle batch writes efficiently
512///    - Consider internal buffering for performance
513///    - Validate events before writing
514///    - Provide detailed errors for each failure mode
515///    - Handle partial failures appropriately
516///
517/// 2. **`flush`**: Must ensure all buffered data is persisted
518///    - Block until writes complete
519///    - Return errors if persistence fails
520///    - Safe to call multiple times (idempotent)
521///
522/// 3. **`supports_transactions`**: Indicate if ACID transactions are supported
523///    - Return `true` only if full ACID guarantees available
524///    - Consider returning `false` for "best effort" systems
525///
526/// 4. **`close`**: Clean up resources and ensure data durability
527///    - Call `flush()` internally
528///    - Close connections/files
529///    - Release resources
530///    - Safe to call multiple times (idempotent)
531///
532/// 5. **`metadata`**: Provide accurate capability information
533///    - Helps pipeline make optimization decisions
534///    - Enables dynamic configuration
535///    - Supports observability
536///
537/// # Examples
538///
539/// See module-level documentation for complete implementation examples.
540#[async_trait]
541pub trait Destination: Send + Sync {
542    /// Writes a batch of events to the destination.
543    ///
544    /// This is the primary method for writing data. Implementations should:
545    /// - Process events efficiently (consider batching to destination API)
546    /// - Maintain event ordering within the batch when possible
547    /// - Handle partial failures gracefully
548    /// - Provide detailed error information
549    ///
550    /// # Buffering
551    ///
552    /// Implementations may buffer events internally for performance. If buffering is used:
553    /// - Document the buffering behavior
554    /// - Implement `flush()` to persist buffered data
555    /// - Consider memory limits
556    ///
557    /// # Error Handling
558    ///
559    /// If this method returns an error:
560    /// - The batch should be considered failed
561    /// - Callers should check `DestinationError::is_retryable()`
562    /// - Partial writes should be rolled back if possible
563    ///
564    /// # Arguments
565    ///
566    /// * `events` - The batch of events to write. Empty batches should succeed without error.
567    ///
568    /// # Returns
569    ///
570    /// - `Ok(())` if the batch was written successfully (or buffered for later write)
571    /// - `Err(DestinationError)` if the write failed
572    ///
573    /// # Examples
574    ///
575    /// ```rust,no_run
576    /// # use rigatoni_core::destination::{Destination, DestinationError};
577    /// # use rigatoni_core::event::ChangeEvent;
578    /// # async fn example(mut dest: impl Destination, events: Vec<ChangeEvent>) -> Result<(), DestinationError> {
579    /// // Write a batch
580    /// dest.write_batch(&events).await?;
581    ///
582    /// // Empty batches are allowed
583    /// dest.write_batch(&[]).await?;
584    /// # Ok(())
585    /// # }
586    /// ```
587    async fn write_batch(&mut self, events: &[ChangeEvent]) -> Result<(), DestinationError>;
588
589    /// Flushes any buffered data to ensure durability.
590    ///
591    /// This method ensures that all previously written data is persisted to the destination.
592    /// After `flush()` returns successfully, all prior `write_batch()` calls should be durable.
593    ///
594    /// # Ordering Guarantees
595    ///
596    /// - **Write ordering**: Events within a batch maintain their order
597    /// - **Batch ordering**: Batches are persisted in the order `write_batch()` was called
598    /// - **Happens-before**: A successful `flush()` establishes a happens-before relationship
599    ///   with all prior writes, guaranteeing visibility to subsequent reads
600    ///
601    /// # When to Call
602    ///
603    /// Call `flush()`:
604    /// - Before shutting down the application
605    /// - At transaction boundaries
606    /// - After critical batches that require durability
607    /// - Periodically to limit data loss window
608    ///
609    /// # Idempotency
610    ///
611    /// This method should be idempotent - calling it multiple times should be safe.
612    /// Subsequent calls with no intervening writes should be no-ops.
613    ///
614    /// # Blocking
615    ///
616    /// This method should block until all data is persisted. Don't return early with
617    /// "flush initiated" - wait for confirmation.
618    ///
619    /// # Error Handling
620    ///
621    /// If this method returns an error, some buffered data may be lost. Callers should:
622    /// - Consider the pipeline state corrupted
623    /// - Attempt recovery or fail fast
624    /// - Log the error with full context
625    ///
626    /// # Examples
627    ///
628    /// ```rust,no_run
629    /// # use rigatoni_core::destination::{Destination, DestinationError};
630    /// # use rigatoni_core::event::ChangeEvent;
631    /// # async fn example(mut dest: impl Destination, batches: Vec<Vec<ChangeEvent>>) -> Result<(), DestinationError> {
632    /// // Write multiple batches
633    /// for batch in batches {
634    ///     dest.write_batch(&batch).await?;
635    /// }
636    ///
637    /// // Ensure everything is persisted
638    /// dest.flush().await?;
639    /// # Ok(())
640    /// # }
641    /// ```
642    async fn flush(&mut self) -> Result<(), DestinationError>;
643
644    /// Returns whether this destination supports ACID transactions.
645    ///
646    /// Return `true` only if the destination provides full ACID guarantees:
647    /// - **Atomicity**: All or nothing for transaction operations
648    /// - **Consistency**: Transactions maintain invariants
649    /// - **Isolation**: Concurrent transactions don't interfere
650    /// - **Durability**: Committed data survives failures
651    ///
652    /// # Use Cases
653    ///
654    /// Transaction support enables:
655    /// - Exactly-once semantics across batches
656    /// - Coordinated multi-destination writes
657    /// - Reliable state management
658    ///
659    /// # Implementation Note
660    ///
661    /// This is a synchronous method (not async) because it should return a compile-time
662    /// constant. The result should not depend on runtime state.
663    ///
664    /// # Examples
665    ///
666    /// ```rust
667    /// # use rigatoni_core::destination::{Destination, DestinationError};
668    /// # async fn example(dest: impl Destination) {
669    /// if dest.supports_transactions() {
670    ///     println!("Destination supports ACID transactions");
671    /// } else {
672    ///     println!("Destination provides at-least-once semantics");
673    /// }
674    /// # }
675    /// ```
676    fn supports_transactions(&self) -> bool;
677
678    /// Closes the destination and releases resources.
679    ///
680    /// This method should:
681    /// 1. Call `flush()` to persist any buffered data
682    /// 2. Close network connections
683    /// 3. Release file handles
684    /// 4. Clean up temporary resources
685    ///
686    /// # Idempotency
687    ///
688    /// This method should be idempotent - safe to call multiple times.
689    /// Subsequent calls should be no-ops. Implementers SHOULD ensure multiple calls
690    /// are safe even after resource teardown (e.g., by tracking closed state).
691    ///
692    /// # Drop vs Close
693    ///
694    /// While Rust's `Drop` trait handles cleanup, `close()` is async and can return errors.
695    /// Always call `close()` explicitly for graceful shutdown. The `Drop` impl should:
696    /// - Log a warning if `close()` wasn't called
697    /// - Attempt best-effort cleanup
698    /// - Not panic
699    ///
700    /// # Default Implementation
701    ///
702    /// The default implementation just calls `flush()`. Override this method if you need
703    /// additional cleanup logic (connection closure, file handles, etc.).
704    ///
705    /// # Examples
706    ///
707    /// ```rust,no_run
708    /// # use rigatoni_core::destination::{Destination, DestinationError};
709    /// # async fn example(mut dest: impl Destination) -> Result<(), DestinationError> {
710    /// // Use the destination
711    /// // ...
712    ///
713    /// // Clean shutdown
714    /// dest.close().await?;
715    /// # Ok(())
716    /// # }
717    /// ```
718    async fn close(&mut self) -> Result<(), DestinationError> {
719        self.flush().await
720    }
721
722    /// Returns metadata about this destination's capabilities.
723    ///
724    /// This enables runtime introspection and dynamic pipeline configuration.
725    ///
726    /// # Default Implementation
727    ///
728    /// The default implementation returns basic metadata. Override to provide
729    /// destination-specific information.
730    ///
731    /// # Examples
732    ///
733    /// ```rust
734    /// # use rigatoni_core::destination::{Destination, DestinationError};
735    /// # async fn example(dest: impl Destination) {
736    /// let meta = dest.metadata();
737    /// println!("Destination: {}", meta.name);
738    /// println!("Supports transactions: {}", meta.supports_transactions);
739    ///
740    /// if let Some(max_batch) = meta.max_batch_size {
741    ///     println!("Max batch size: {}", max_batch);
742    /// }
743    /// # }
744    /// ```
745    fn metadata(&self) -> DestinationMetadata {
746        DestinationMetadata::new("Unknown", "unknown")
747            .with_transactions(self.supports_transactions())
748    }
749}