rigatoni_core/destination.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Destination Trait and Error Types
18//!
19//! This module defines the core [`Destination`] trait that all destination implementations
20//! must satisfy. Destinations are the final stage in the data replication pipeline where processed
21//! events are written to external systems like S3, `BigQuery`, Kafka, or databases.
22//!
23//! # Architecture
24//!
25//! The [`Destination`] trait provides a uniform interface for writing batches of events
26//! and managing destination state. It's designed to be:
27//!
28//! - **Async-first**: All operations are asynchronous for high throughput
29//! - **Batch-oriented**: Events are written in batches for efficiency
30//! - **Thread-safe**: Implementations must be `Send + Sync` for concurrent access
31//! - **Error-resilient**: Comprehensive error handling with retry capabilities
32//! - **Extensible**: Optional methods allow future additions without breaking changes
33//!
34//! # Design Rationale
35//!
36//! ## Why Batching?
37//!
38//! Most external systems perform better with batch operations rather than individual writes.
39//! The [`Destination::write_batch`] method allows implementations to:
40//! - Amortize network overhead across multiple events
41//! - Use bulk APIs for better throughput
42//! - Implement efficient memory management
43//!
44//! ## Why Separate `flush()`?
45//!
46//! The [`Destination::flush`] method is separate from [`Destination::write_batch`] because:
47//! - **Buffering control**: Destinations may buffer writes internally for efficiency
48//! - **Durability guarantees**: `flush()` provides a synchronization point for durability
49//! - **Transaction boundaries**: Allows grouping multiple batches into transactions
50//! - **Graceful shutdown**: Ensures all buffered data is written before termination
51//!
52//! ## Thread Safety
53//!
54//! All implementations must be `Send + Sync + 'static` to support:
55//! - Concurrent writes from multiple producers
56//! - Safe sharing across async task boundaries
57//! - Use in thread pools and work stealing schedulers
58//!
59//! # Examples
60//!
61//! ## Implementing a Custom Destination
62//!
63//! ```rust
64//! use rigatoni_core::destination::{Destination, DestinationError};
65//! use rigatoni_core::event::ChangeEvent;
66//! use async_trait::async_trait;
67//!
68//! /// A simple file-based destination
69//! pub struct FileDestination {
70//! path: String,
71//! buffer: Vec<String>,
72//! }
73//!
74//! #[async_trait]
75//! impl Destination for FileDestination {
76//! async fn write_batch(&mut self, events: &[ChangeEvent]) -> Result<(), DestinationError> {
77//! // Buffer events as JSON lines
78//! for event in events {
79//! let json = serde_json::to_string(event)
80//! .map_err(|e| DestinationError::serialization(e, "Failed to serialize event"))?;
81//! self.buffer.push(json);
82//! }
83//! Ok(())
84//! }
85//!
86//! async fn flush(&mut self) -> Result<(), DestinationError> {
87//! // Write all buffered events to file
88//! if !self.buffer.is_empty() {
89//! // Implementation would write to file here
90//! self.buffer.clear();
91//! }
92//! Ok(())
93//! }
94//!
95//! fn supports_transactions(&self) -> bool {
96//! false // File writes don't support transactions
97//! }
98//!
99//! async fn close(&mut self) -> Result<(), DestinationError> {
100//! self.flush().await
101//! }
102//! }
103//! ```
104//!
105//! ## Using a Destination
106//!
107//! ```rust,no_run
108//! # use rigatoni_core::destination::{Destination, DestinationError};
109//! # use rigatoni_core::event::ChangeEvent;
110//! # async fn example(mut dest: impl Destination, events: Vec<ChangeEvent>) -> Result<(), DestinationError> {
111//! // Write a batch of events
112//! dest.write_batch(&events).await?;
113//!
114//! // Flush to ensure durability
115//! dest.flush().await?;
116//!
117//! // Clean shutdown
118//! dest.close().await?;
119//! # Ok(())
120//! # }
121//! ```
122//!
123//! # Error Handling
124//!
125//! The [`DestinationError`] type provides detailed error classification:
126//! - [`DestinationError::ConnectionError`]: Network/connection failures (retryable)
127//! - [`DestinationError::SerializationError`]: Data serialization failures (non-retryable)
128//! - [`DestinationError::WriteError`]: Write operation failures (may be retryable)
129//! - [`DestinationError::ConfigurationError`]: Invalid configuration (non-retryable)
130//! - [`DestinationError::CapacityError`]: Resource limits exceeded (backpressure signal)
131//!
132//! Each error variant includes context about retryability to help callers implement
133//! appropriate retry strategies.
134//!
135//! # Performance Considerations
136//!
137//! ## Trait Objects vs Generics
138//!
139//! The trait can be used in two ways:
140//!
141//! ```rust
142//! # use rigatoni_core::destination::Destination;
143//! // Static dispatch (zero-cost abstraction, monomorphization)
144//! async fn process_with_static<D: Destination>(dest: D) {
145//! // Compiler generates specialized code for each destination type
146//! }
147//!
148//! // Dynamic dispatch (runtime flexibility, single compiled function)
149//! async fn process_with_dynamic(dest: Box<dyn Destination>) {
150//! // Single compiled function, virtual dispatch overhead
151//! }
152//! ```
153//!
154//! **Trade-offs**:
155//! - Static dispatch: Faster (no vtable), larger binary (code duplication), compile-time type
156//! - Dynamic dispatch: Smaller binary, allows runtime destination selection, slight overhead
157//!
158//! For most data replication workloads, the difference is negligible compared to I/O costs.
159//!
160//! # Future Evolution
161//!
162//! The trait is designed for non-breaking evolution:
163//! - New optional methods can be added with default implementations
164//! - The [`Destination::metadata`] method provides capability discovery
165//! - Version negotiation can be added through metadata
166//!
167//! Planned additions:
168//! - `async fn begin_transaction(&mut self) -> Result<TransactionId, DestinationError>`
169//! - `async fn commit_transaction(&mut self, id: TransactionId) -> Result<(), DestinationError>`
170//! - `async fn health_check(&self) -> Result<HealthStatus, DestinationError>`
171
172use crate::event::ChangeEvent;
173use async_trait::async_trait;
174use std::collections::HashMap;
175use thiserror::Error;
176
177/// Errors that can occur when writing to a destination.
178///
179/// Each error variant includes information about whether the operation is retryable
180/// and provides context for error handling and observability.
181#[derive(Error, Debug)]
182pub enum DestinationError {
183 /// Connection to the destination failed.
184 ///
185 /// This is typically retryable after a backoff period. Examples include:
186 /// - Network timeouts
187 /// - DNS resolution failures
188 /// - Connection refused
189 /// - TLS handshake failures
190 #[error("Connection error: {message}")]
191 ConnectionError {
192 /// Human-readable error message
193 message: String,
194 /// The underlying connection error
195 #[source]
196 source: Option<Box<dyn std::error::Error + Send + Sync>>,
197 },
198
199 /// Failed to serialize events for writing.
200 ///
201 /// This is typically non-retryable as it indicates a data quality issue.
202 /// Examples include:
203 /// - Invalid UTF-8 sequences
204 /// - Schema validation failures
205 /// - Unsupported data types
206 #[error("Serialization error: {message}")]
207 SerializationError {
208 /// Human-readable error message
209 message: String,
210 /// The underlying serialization error
211 #[source]
212 source: Option<Box<dyn std::error::Error + Send + Sync>>,
213 },
214
215 /// Failed to write events to the destination.
216 ///
217 /// Retryability depends on the specific cause. Examples include:
218 /// - Quota exceeded (retryable after backoff)
219 /// - Permission denied (non-retryable)
220 /// - Destination unavailable (retryable)
221 /// - Invalid data format (non-retryable)
222 #[error("Write error: {message}")]
223 WriteError {
224 /// Human-readable error message
225 message: String,
226 /// Whether this specific write error is retryable
227 retryable: bool,
228 /// The underlying error
229 #[source]
230 source: Option<Box<dyn std::error::Error + Send + Sync>>,
231 },
232
233 /// Invalid destination configuration.
234 ///
235 /// This is non-retryable and indicates a programming or configuration error.
236 /// Examples include:
237 /// - Missing required configuration parameters
238 /// - Invalid connection strings
239 /// - Conflicting options
240 #[error("Configuration error: {message}")]
241 ConfigurationError {
242 /// Human-readable error message
243 message: String,
244 /// Configuration parameter name if applicable
245 parameter: Option<String>,
246 },
247
248 /// Destination capacity exceeded.
249 ///
250 /// This signals backpressure and should pause writes. Examples include:
251 /// - Write buffer full
252 /// - Rate limit exceeded
253 /// - Disk space exhausted
254 /// - Memory limit reached
255 #[error("Capacity error: {message}")]
256 CapacityError {
257 /// Human-readable error message
258 message: String,
259 /// Current capacity utilization (0.0 - 1.0)
260 utilization: Option<f64>,
261 /// Suggested wait time before retry
262 retry_after: Option<std::time::Duration>,
263 },
264
265 /// A generic error occurred.
266 ///
267 /// Used for errors that don't fit other categories or for wrapping
268 /// destination-specific errors.
269 #[error("Destination error: {message}")]
270 Other {
271 /// Human-readable error message
272 message: String,
273 /// Whether this error is retryable
274 retryable: bool,
275 /// The underlying error
276 #[source]
277 source: Option<Box<dyn std::error::Error + Send + Sync>>,
278 },
279}
280
281impl DestinationError {
282 /// Creates a connection error from any error type.
283 #[must_use]
284 pub fn connection(source: impl std::error::Error + Send + Sync + 'static) -> Self {
285 Self::ConnectionError {
286 message: source.to_string(),
287 source: Some(Box::new(source)),
288 }
289 }
290
291 /// Creates a connection error with a custom message.
292 #[must_use]
293 pub fn connection_msg(message: impl Into<String>) -> Self {
294 Self::ConnectionError {
295 message: message.into(),
296 source: None,
297 }
298 }
299
300 /// Creates a serialization error from any error type.
301 #[must_use]
302 pub fn serialization(
303 source: impl std::error::Error + Send + Sync + 'static,
304 message: impl Into<String>,
305 ) -> Self {
306 Self::SerializationError {
307 message: message.into(),
308 source: Some(Box::new(source)),
309 }
310 }
311
312 /// Creates a write error with retryability information.
313 #[must_use]
314 pub fn write(source: impl std::error::Error + Send + Sync + 'static, retryable: bool) -> Self {
315 Self::WriteError {
316 message: source.to_string(),
317 retryable,
318 source: Some(Box::new(source)),
319 }
320 }
321
322 /// Creates a write error with a custom message.
323 #[must_use]
324 pub fn write_msg(message: impl Into<String>, retryable: bool) -> Self {
325 Self::WriteError {
326 message: message.into(),
327 retryable,
328 source: None,
329 }
330 }
331
332 /// Creates a configuration error.
333 #[must_use]
334 pub fn configuration(message: impl Into<String>, parameter: Option<String>) -> Self {
335 Self::ConfigurationError {
336 message: message.into(),
337 parameter,
338 }
339 }
340
341 /// Creates a capacity error with optional metadata.
342 ///
343 /// # Panics
344 ///
345 /// Panics in debug builds if `utilization` is provided and not in range [0.0, 1.0].
346 #[must_use]
347 pub fn capacity(
348 message: impl Into<String>,
349 utilization: Option<f64>,
350 retry_after: Option<std::time::Duration>,
351 ) -> Self {
352 // Validate utilization range in debug builds
353 if let Some(u) = utilization {
354 debug_assert!(
355 (0.0..=1.0).contains(&u),
356 "utilization must be in range [0.0, 1.0], got {u}"
357 );
358 }
359
360 Self::CapacityError {
361 message: message.into(),
362 utilization,
363 retry_after,
364 }
365 }
366
367 /// Creates a generic error.
368 #[must_use]
369 pub fn other(source: impl std::error::Error + Send + Sync + 'static, retryable: bool) -> Self {
370 Self::Other {
371 message: source.to_string(),
372 retryable,
373 source: Some(Box::new(source)),
374 }
375 }
376
377 /// Returns whether this error is retryable.
378 ///
379 /// This helps callers implement appropriate retry strategies:
380 /// - Retryable errors should be retried with exponential backoff
381 /// - Non-retryable errors should fail fast or route to dead letter queue
382 #[must_use]
383 pub const fn is_retryable(&self) -> bool {
384 match self {
385 Self::ConnectionError { .. } | Self::CapacityError { .. } => true,
386 Self::SerializationError { .. } | Self::ConfigurationError { .. } => false,
387 Self::WriteError { retryable, .. } | Self::Other { retryable, .. } => *retryable,
388 }
389 }
390
391 /// Returns suggested wait time before retry, if applicable.
392 #[must_use]
393 pub const fn retry_after(&self) -> Option<std::time::Duration> {
394 match self {
395 Self::CapacityError { retry_after, .. } => *retry_after,
396 _ => None,
397 }
398 }
399}
400
401/// Metadata about a destination's capabilities.
402///
403/// This allows runtime introspection of destination features and is used for:
404/// - Feature detection and capability negotiation
405/// - Monitoring and observability
406/// - Dynamic pipeline configuration
407#[derive(Debug, Clone, PartialEq, Eq)]
408pub struct DestinationMetadata {
409 /// Human-readable destination name (e.g., "S3", "`BigQuery`", "Kafka")
410 pub name: String,
411
412 /// Destination type identifier (e.g., "s3", "bigquery", "kafka")
413 pub destination_type: String,
414
415 /// Whether the destination supports transactions
416 pub supports_transactions: bool,
417
418 /// Maximum batch size hint (None = no limit)
419 pub max_batch_size: Option<usize>,
420
421 /// Whether the destination supports concurrent writes
422 pub supports_concurrent_writes: bool,
423
424 /// Additional destination-specific metadata
425 pub properties: HashMap<String, String>,
426}
427
428impl DestinationMetadata {
429 /// Creates new metadata with required fields.
430 #[must_use]
431 pub fn new(name: impl Into<String>, destination_type: impl Into<String>) -> Self {
432 Self {
433 name: name.into(),
434 destination_type: destination_type.into(),
435 supports_transactions: false,
436 max_batch_size: None,
437 supports_concurrent_writes: true,
438 properties: HashMap::new(),
439 }
440 }
441
442 /// Sets transaction support.
443 #[must_use]
444 pub const fn with_transactions(mut self, supports: bool) -> Self {
445 self.supports_transactions = supports;
446 self
447 }
448
449 /// Sets maximum batch size.
450 #[must_use]
451 pub const fn with_max_batch_size(mut self, size: usize) -> Self {
452 self.max_batch_size = Some(size);
453 self
454 }
455
456 /// Sets concurrent write support.
457 #[must_use]
458 pub const fn with_concurrent_writes(mut self, supports: bool) -> Self {
459 self.supports_concurrent_writes = supports;
460 self
461 }
462
463 /// Adds a custom property.
464 #[must_use]
465 pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
466 self.properties.insert(key.into(), value.into());
467 self
468 }
469}
470
471/// The core destination trait for writing events to external systems.
472///
473/// All destination implementations (S3, `BigQuery`, Kafka, databases, etc.) must implement
474/// this trait. It provides a uniform interface for the Rigatoni pipeline to write events
475/// regardless of the underlying destination system.
476///
477/// # Thread Safety
478///
479/// Implementations must be `Send + Sync + 'static` to support:
480/// - Concurrent access from multiple async tasks
481/// - Safe transfer across thread boundaries
482/// - Use in work-stealing schedulers
483///
484/// # Async Considerations
485///
486/// All methods are async to support:
487/// - Non-blocking I/O operations
488/// - Efficient resource utilization
489/// - High concurrency without thread-per-connection
490///
491/// The `#[async_trait]` macro is used because native async trait methods have limitations
492/// in Rust (as of 2024). This macro transforms async trait methods into methods that
493/// return `Pin<Box<dyn Future + Send>>`, enabling dynamic dispatch while maintaining
494/// ergonomic async/await syntax.
495///
496/// # Error Handling
497///
498/// All fallible operations return [`DestinationError`] which provides:
499/// - Detailed error classification
500/// - Retryability information
501/// - Error context for observability
502/// - Source error chaining
503///
504/// Implementations should provide specific error information to help callers make
505/// informed retry decisions.
506///
507/// # Implementation Guide
508///
509/// When implementing this trait:
510///
511/// 1. **`write_batch`**: Should handle batch writes efficiently
512/// - Consider internal buffering for performance
513/// - Validate events before writing
514/// - Provide detailed errors for each failure mode
515/// - Handle partial failures appropriately
516///
517/// 2. **`flush`**: Must ensure all buffered data is persisted
518/// - Block until writes complete
519/// - Return errors if persistence fails
520/// - Safe to call multiple times (idempotent)
521///
522/// 3. **`supports_transactions`**: Indicate if ACID transactions are supported
523/// - Return `true` only if full ACID guarantees available
524/// - Consider returning `false` for "best effort" systems
525///
526/// 4. **`close`**: Clean up resources and ensure data durability
527/// - Call `flush()` internally
528/// - Close connections/files
529/// - Release resources
530/// - Safe to call multiple times (idempotent)
531///
532/// 5. **`metadata`**: Provide accurate capability information
533/// - Helps pipeline make optimization decisions
534/// - Enables dynamic configuration
535/// - Supports observability
536///
537/// # Examples
538///
539/// See module-level documentation for complete implementation examples.
540#[async_trait]
541pub trait Destination: Send + Sync {
542 /// Writes a batch of events to the destination.
543 ///
544 /// This is the primary method for writing data. Implementations should:
545 /// - Process events efficiently (consider batching to destination API)
546 /// - Maintain event ordering within the batch when possible
547 /// - Handle partial failures gracefully
548 /// - Provide detailed error information
549 ///
550 /// # Buffering
551 ///
552 /// Implementations may buffer events internally for performance. If buffering is used:
553 /// - Document the buffering behavior
554 /// - Implement `flush()` to persist buffered data
555 /// - Consider memory limits
556 ///
557 /// # Error Handling
558 ///
559 /// If this method returns an error:
560 /// - The batch should be considered failed
561 /// - Callers should check `DestinationError::is_retryable()`
562 /// - Partial writes should be rolled back if possible
563 ///
564 /// # Arguments
565 ///
566 /// * `events` - The batch of events to write. Empty batches should succeed without error.
567 ///
568 /// # Returns
569 ///
570 /// - `Ok(())` if the batch was written successfully (or buffered for later write)
571 /// - `Err(DestinationError)` if the write failed
572 ///
573 /// # Examples
574 ///
575 /// ```rust,no_run
576 /// # use rigatoni_core::destination::{Destination, DestinationError};
577 /// # use rigatoni_core::event::ChangeEvent;
578 /// # async fn example(mut dest: impl Destination, events: Vec<ChangeEvent>) -> Result<(), DestinationError> {
579 /// // Write a batch
580 /// dest.write_batch(&events).await?;
581 ///
582 /// // Empty batches are allowed
583 /// dest.write_batch(&[]).await?;
584 /// # Ok(())
585 /// # }
586 /// ```
587 async fn write_batch(&mut self, events: &[ChangeEvent]) -> Result<(), DestinationError>;
588
589 /// Flushes any buffered data to ensure durability.
590 ///
591 /// This method ensures that all previously written data is persisted to the destination.
592 /// After `flush()` returns successfully, all prior `write_batch()` calls should be durable.
593 ///
594 /// # Ordering Guarantees
595 ///
596 /// - **Write ordering**: Events within a batch maintain their order
597 /// - **Batch ordering**: Batches are persisted in the order `write_batch()` was called
598 /// - **Happens-before**: A successful `flush()` establishes a happens-before relationship
599 /// with all prior writes, guaranteeing visibility to subsequent reads
600 ///
601 /// # When to Call
602 ///
603 /// Call `flush()`:
604 /// - Before shutting down the application
605 /// - At transaction boundaries
606 /// - After critical batches that require durability
607 /// - Periodically to limit data loss window
608 ///
609 /// # Idempotency
610 ///
611 /// This method should be idempotent - calling it multiple times should be safe.
612 /// Subsequent calls with no intervening writes should be no-ops.
613 ///
614 /// # Blocking
615 ///
616 /// This method should block until all data is persisted. Don't return early with
617 /// "flush initiated" - wait for confirmation.
618 ///
619 /// # Error Handling
620 ///
621 /// If this method returns an error, some buffered data may be lost. Callers should:
622 /// - Consider the pipeline state corrupted
623 /// - Attempt recovery or fail fast
624 /// - Log the error with full context
625 ///
626 /// # Examples
627 ///
628 /// ```rust,no_run
629 /// # use rigatoni_core::destination::{Destination, DestinationError};
630 /// # use rigatoni_core::event::ChangeEvent;
631 /// # async fn example(mut dest: impl Destination, batches: Vec<Vec<ChangeEvent>>) -> Result<(), DestinationError> {
632 /// // Write multiple batches
633 /// for batch in batches {
634 /// dest.write_batch(&batch).await?;
635 /// }
636 ///
637 /// // Ensure everything is persisted
638 /// dest.flush().await?;
639 /// # Ok(())
640 /// # }
641 /// ```
642 async fn flush(&mut self) -> Result<(), DestinationError>;
643
644 /// Returns whether this destination supports ACID transactions.
645 ///
646 /// Return `true` only if the destination provides full ACID guarantees:
647 /// - **Atomicity**: All or nothing for transaction operations
648 /// - **Consistency**: Transactions maintain invariants
649 /// - **Isolation**: Concurrent transactions don't interfere
650 /// - **Durability**: Committed data survives failures
651 ///
652 /// # Use Cases
653 ///
654 /// Transaction support enables:
655 /// - Exactly-once semantics across batches
656 /// - Coordinated multi-destination writes
657 /// - Reliable state management
658 ///
659 /// # Implementation Note
660 ///
661 /// This is a synchronous method (not async) because it should return a compile-time
662 /// constant. The result should not depend on runtime state.
663 ///
664 /// # Examples
665 ///
666 /// ```rust
667 /// # use rigatoni_core::destination::{Destination, DestinationError};
668 /// # async fn example(dest: impl Destination) {
669 /// if dest.supports_transactions() {
670 /// println!("Destination supports ACID transactions");
671 /// } else {
672 /// println!("Destination provides at-least-once semantics");
673 /// }
674 /// # }
675 /// ```
676 fn supports_transactions(&self) -> bool;
677
678 /// Closes the destination and releases resources.
679 ///
680 /// This method should:
681 /// 1. Call `flush()` to persist any buffered data
682 /// 2. Close network connections
683 /// 3. Release file handles
684 /// 4. Clean up temporary resources
685 ///
686 /// # Idempotency
687 ///
688 /// This method should be idempotent - safe to call multiple times.
689 /// Subsequent calls should be no-ops. Implementers SHOULD ensure multiple calls
690 /// are safe even after resource teardown (e.g., by tracking closed state).
691 ///
692 /// # Drop vs Close
693 ///
694 /// While Rust's `Drop` trait handles cleanup, `close()` is async and can return errors.
695 /// Always call `close()` explicitly for graceful shutdown. The `Drop` impl should:
696 /// - Log a warning if `close()` wasn't called
697 /// - Attempt best-effort cleanup
698 /// - Not panic
699 ///
700 /// # Default Implementation
701 ///
702 /// The default implementation just calls `flush()`. Override this method if you need
703 /// additional cleanup logic (connection closure, file handles, etc.).
704 ///
705 /// # Examples
706 ///
707 /// ```rust,no_run
708 /// # use rigatoni_core::destination::{Destination, DestinationError};
709 /// # async fn example(mut dest: impl Destination) -> Result<(), DestinationError> {
710 /// // Use the destination
711 /// // ...
712 ///
713 /// // Clean shutdown
714 /// dest.close().await?;
715 /// # Ok(())
716 /// # }
717 /// ```
718 async fn close(&mut self) -> Result<(), DestinationError> {
719 self.flush().await
720 }
721
722 /// Returns metadata about this destination's capabilities.
723 ///
724 /// This enables runtime introspection and dynamic pipeline configuration.
725 ///
726 /// # Default Implementation
727 ///
728 /// The default implementation returns basic metadata. Override to provide
729 /// destination-specific information.
730 ///
731 /// # Examples
732 ///
733 /// ```rust
734 /// # use rigatoni_core::destination::{Destination, DestinationError};
735 /// # async fn example(dest: impl Destination) {
736 /// let meta = dest.metadata();
737 /// println!("Destination: {}", meta.name);
738 /// println!("Supports transactions: {}", meta.supports_transactions);
739 ///
740 /// if let Some(max_batch) = meta.max_batch_size {
741 /// println!("Max batch size: {}", max_batch);
742 /// }
743 /// # }
744 /// ```
745 fn metadata(&self) -> DestinationMetadata {
746 DestinationMetadata::new("Unknown", "unknown")
747 .with_transactions(self.supports_transactions())
748 }
749}