# Caudal SDK (Rust)
[](https://crates.io/crates/caudal-sdk)
[](https://docs.rs/caudal-sdk)
[](https://github.com/JesusCabreraReveles/caudal-sdk-rs)
Observability SDK for event-driven distributed systems.
## What is Caudal?
Caudal is an observability system for event flows. It is **NOT** a broker, it **DOES NOT** participate in the critical path, and it **DOES NOT** guarantee delivery. It only observes, records, and correlates events describing how a message flows through a distributed system.
## Features
- ✅ **Append-only**: Only records events, never modifies them.
- ✅ **Event-only**: Does not transport business payloads.
- ✅ **Non-blocking**: BufferedEmitter for non-blocking emission.
- ✅ **Idempotent**: Safe for retries.
- ✅ **Production-safe**: Never breaks the system using it.
- ✅ **Transport-agnostic**: Decoupled through traits.
- ✅ **Kafka/Redpanda**: Optional implementation via feature flag.
- ✅ **Async support**: Support for async/await.
- ✅ **Metrics**: Internal emission metrics.
## Installation
Add this to your `Cargo.toml`:
```toml
[dependencies]
caudal-sdk = "0.1.0"
# With JSON serialization
caudal-sdk = { version = "0.1.0", features = ["serde_json"] }
# With Kafka support
caudal-sdk = { version = "0.1.0", features = ["kafka"] }
# With async support
caudal-sdk = { version = "0.1.0", features = ["async-runtime"] }
# All features
caudal-sdk = { version = "0.1.0", features = ["serde_json", "kafka", "async-runtime"] }
```
## Basic Usage
### Creating and Emitting Events
```rust
use caudal_sdk::{
FlowEventBuilder, CaudalEmitter,
NodeType, FlowStage, FlowStatus, AckType,
};
// Create an event (flow_id is required and represents the logical unit)
let event = FlowEventBuilder::new("siscom-consumer")
.group_flow("siscom") // Optional: grouping for Realtime/UI
.node("api-gateway", NodeType::Producer)
.stage(FlowStage::Emit)
.status(FlowStatus::Sent)
.ack(AckType::Implicit)
.payload_bytes(190) // Optional: Logical size
.input_bytes(190) // Optional: Input bytes
.output_bytes(210) // Optional: Output bytes
.meta("endpoint", "/api/users")
.build()?;
// event_id is generated automatically as UUID v4 if not specified
// Emit with custom transport
let emitter = CaudalEmitter::new(my_transport);
emitter.emit_strict(event)?;
// Or best-effort mode (never fails)
emitter.emit_best_effort(event);
```
### BufferedEmitter (Non-Blocking)
```rust
use caudal_sdk::BufferedEmitter;
use caudal_sdk::emitter::buffered::{BufferedConfig, DropPolicy};
let config = BufferedConfig {
buffer_size: 1000,
drop_policy: DropPolicy::DropNewest,
flush_interval: None,
};
let emitter = BufferedEmitter::new(transport, config);
// Emit without blocking (never fails)
emitter.emit_best_effort(event);
// Get metrics
let metrics = emitter.metrics();
println!("Sent: {}, Dropped: {}", metrics.events_sent, metrics.events_dropped);
```
### Kafka Transport
```rust
use caudal_sdk::transport::kafka::{KafkaTransport, KafkaConfig};
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
topic: "caudal-events".to_string(),
acks: "all".to_string(),
..Default::default()
};
let transport = KafkaTransport::new(config)?;
let emitter = CaudalEmitter::new(transport);
```
## Features
| `serde_json` | JSON serialization of events | `serde_json` |
| `kafka` | Kafka/Redpanda transport | `rdkafka`, `tokio` |
| `async-runtime` | async/await support | `async-trait`, `tokio` |
| `tracing` | Integration with tracing | `tracing` |
## FlowEvent v1 Contract
Each event includes:
```rust
pub struct FlowEvent {
pub spec_version: String, // "caudal.v1"
pub event_id: Uuid, // Unique event ID (tracing)
pub flow_id: String, // Flow ID (logical unit)
pub group_flow: Option<String>, // Logical grouping (realtime/UI)
pub node_id: String, // Emitting node ID
pub node_type: NodeType, // Producer, Consumer, Bridge, etc.
pub timestamp: DateTime<Utc>, // UTC Timestamp
pub stage: FlowStage, // Ingress, Emit, Process, etc.
pub status: FlowStatus, // Sent, Received, Ack, etc.
pub ack: AckInfo, // Acknowledgment information
pub metadata: EventMetadata, // Structured metadata (see below)
pub error: Option<FlowError>, // Error information
}
pub struct EventMetadata {
pub event_id: Option<Uuid>, // Event ID (redundant)
pub bytes: Option<String>, // Legacy bytes (string)
pub payload_bytes: Option<u64>, // Logical size
pub input_bytes: Option<u64>, // Input bytes
pub output_bytes: Option<u64>, // Output bytes
pub extra: HashMap<String, String>, // Other dynamic fields
}
```
### Enums
- **NodeType**: `Producer`, `Bridge`, `Consumer`, `Storage`, `External`
- **FlowStage**: `Ingress`, `Emit`, `Bridge`, `Consume`, `Process`, `Persist`, `Egress`, `Error`
- **FlowStatus**: `Sent`, `Received`, `Ack`, `Nack`, `Timeout`, `Skipped`
- **AckType**: `None`, `Implicit`, `Explicit`, `Bridge`
## Canonical Event Pattern
Applications **SHOULD**:
- **Define domain events explicitly**: Model your business events clearly before emitting flow events.
- **Emit a single "processed" event per flow**: Each flow should have one definitive completion event.
- **Emit explicit ACK events for external systems**: When integrating with external services, emit explicit acknowledgment events.
- **Correlate using flow_id**: Use the same `flow_id` across all events in a single flow for proper correlation.
### Example Flow
```rust
let flow_id = "order-processing";
let group = "siscom";
// 1. Ingress event
let ingress = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("api-gateway", NodeType::Producer)
.stage(FlowStage::Ingress)
.status(FlowStatus::Received)
.build()?;
// 2. Processing event
let processing = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("order-service", NodeType::Consumer)
.stage(FlowStage::Process)
.status(FlowStatus::Received)
.build()?;
// 3. External ACK event
let ack = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("payment-gateway", NodeType::External)
.stage(FlowStage::Egress)
.status(FlowStatus::Ack)
.ack(AckType::Explicit)
.build()?;
// 4. Final "processed" event
let processed = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("order-service", NodeType::Consumer)
.stage(FlowStage::Process)
.status(FlowStatus::Ack)
.meta("order_id", "12345")
.build()?;
```
## Implementing a Transport
```rust
use caudal_sdk::{CaudalTransport, FlowEvent};
use anyhow::Result;
struct MyTransport {
// Your configuration
}
impl CaudalTransport for MyTransport {
fn emit(&self, event: FlowEvent) -> Result<()> {
// Implement event emission
// For example, to Redis, HTTP, etc.
Ok(())
}
}
```
## Kafka Configuration
```rust
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
topic: "caudal-events".to_string(),
acks: "all".to_string(), // "all", "1", "0"
linger_ms: Some(10), // Batching delay
batch_size: Some(16384), // Batch size in bytes
timeout_ms: 5000, // Timeout for delivery
};
```
### Acks Configuration
- `"all"` or `"-1"`: Maximum durability (waits for all replicas)
- `"1"`: Waits only for the leader
- `"0"`: Fire-and-forget (no acknowledgment)
## Guarantees and Limitations
### ✅ Guarantees
- **Non-blocking**: BufferedEmitter never blocks the production thread.
- **Production-safe**: Manageable errors, never panics.
- **Idempotent**: Safe for retries.
- **Thread-safe**: All components are Send + Sync.
### ⚠️ Limitations
- **Best-effort**: Does not guarantee delivery (it is not a broker).
- **Non-transactional**: Does not participate in distributed transactions.
- **Metadata only**: Does not send business payloads.
- **Append-only**: Does not modify or delete events.
## Examples
See the `examples/` directory for full examples:
```bash
# Basic example
cargo run --example basic_sync --features serde_json
# BufferedEmitter
cargo run --example buffered
# Kafka producer (requires local Kafka)
cargo run --example kafka_producer --features kafka
```
## Testing
```bash
# Unit tests
cargo test
# Tests with all features
cargo test --all-features
# Clippy
cargo clippy --all-features -- -D warnings
# Format
cargo fmt -- --check
```
## Roadmap
- [x] FlowEvent v1 Contract
- [x] Ergonomic Builder
- [x] Abstract Transport (traits)
- [x] Non-blocking BufferedEmitter
- [x] Kafka/Redpanda Implementation
- [x] async/await Support
- [x] JSON Serialization
- [x] Unit Tests
- [ ] Configurable retry policies
- [ ] Circuit breaker
- [ ] Prometheus metrics
- [ ] Intelligent batching
- [ ] Event compression
> [!NOTE]
> Future versions may provide helper builders for common patterns,
> once enough real-world usage patterns are validated.
## License
MIT OR Apache-2.0
## Contributing
Contributions are welcome! Please open an issue or PR.
## Support
- 📖 [Documentación](https://docs.rs/caudal-sdk)
- 🐛 [Issues](https://github.com/JesusCabreraReveles/caudal-sdk-rs/issues)
- 💬 [Discussions](https://github.com/JesusCabreraReveles/caudal-sdk-rs/discussions)