caudal-sdk 0.1.0

Caudal SDK - Event observability for distributed systems
Documentation
# Caudal SDK (Rust)

[![Crates.io](https://img.shields.io/crates/v/caudal-sdk.svg)](https://crates.io/crates/caudal-sdk)
[![Documentation](https://docs.rs/caudal-sdk/badge.svg)](https://docs.rs/caudal-sdk)
[![License](https://img.shields.io/crates/l/caudal-sdk.svg)](https://github.com/JesusCabreraReveles/caudal-sdk-rs)

Observability SDK for event-driven distributed systems.

## What is Caudal?

Caudal is an observability system for event flows. It is **NOT** a broker, it **DOES NOT** participate in the critical path, and it **DOES NOT** guarantee delivery. It only observes, records, and correlates events describing how a message flows through a distributed system.

## Features

- **Append-only**: Only records events, never modifies them.
-**Event-only**: Does not transport business payloads.
-**Non-blocking**: BufferedEmitter for non-blocking emission.
-**Idempotent**: Safe for retries.
-**Production-safe**: Never breaks the system using it.
-**Transport-agnostic**: Decoupled through traits.
-**Kafka/Redpanda**: Optional implementation via feature flag.
-**Async support**: Support for async/await.
-**Metrics**: Internal emission metrics.

## Installation

Add this to your `Cargo.toml`:

```toml
[dependencies]
caudal-sdk = "0.1.0"

# With JSON serialization
caudal-sdk = { version = "0.1.0", features = ["serde_json"] }

# With Kafka support
caudal-sdk = { version = "0.1.0", features = ["kafka"] }

# With async support
caudal-sdk = { version = "0.1.0", features = ["async-runtime"] }

# All features
caudal-sdk = { version = "0.1.0", features = ["serde_json", "kafka", "async-runtime"] }
```

## Basic Usage

### Creating and Emitting Events

```rust
use caudal_sdk::{
    FlowEventBuilder, CaudalEmitter,
    NodeType, FlowStage, FlowStatus, AckType,
};

// Create an event (flow_id is required and represents the logical unit)
let event = FlowEventBuilder::new("siscom-consumer")
    .group_flow("siscom") // Optional: grouping for Realtime/UI
    .node("api-gateway", NodeType::Producer)
    .stage(FlowStage::Emit)
    .status(FlowStatus::Sent)
    .ack(AckType::Implicit)
    .payload_bytes(190)   // Optional: Logical size
    .input_bytes(190)    // Optional: Input bytes
    .output_bytes(210)   // Optional: Output bytes
    .meta("endpoint", "/api/users")
    .build()?;

// event_id is generated automatically as UUID v4 if not specified

// Emit with custom transport
let emitter = CaudalEmitter::new(my_transport);
emitter.emit_strict(event)?;

// Or best-effort mode (never fails)
emitter.emit_best_effort(event);
```

### BufferedEmitter (Non-Blocking)

```rust
use caudal_sdk::BufferedEmitter;
use caudal_sdk::emitter::buffered::{BufferedConfig, DropPolicy};

let config = BufferedConfig {
    buffer_size: 1000,
    drop_policy: DropPolicy::DropNewest,
    flush_interval: None,
};

let emitter = BufferedEmitter::new(transport, config);

// Emit without blocking (never fails)
emitter.emit_best_effort(event);

// Get metrics
let metrics = emitter.metrics();
println!("Sent: {}, Dropped: {}", metrics.events_sent, metrics.events_dropped);
```

### Kafka Transport

```rust
use caudal_sdk::transport::kafka::{KafkaTransport, KafkaConfig};

let config = KafkaConfig {
    bootstrap_servers: "localhost:9092".to_string(),
    topic: "caudal-events".to_string(),
    acks: "all".to_string(),
    ..Default::default()
};

let transport = KafkaTransport::new(config)?;
let emitter = CaudalEmitter::new(transport);
```

## Features

| Feature | Description | Dependencies |
| :--- | :--- | :--- |
| `serde_json` | JSON serialization of events | `serde_json` |
| `kafka` | Kafka/Redpanda transport | `rdkafka`, `tokio` |
| `async-runtime` | async/await support | `async-trait`, `tokio` |
| `tracing` | Integration with tracing | `tracing` |

## FlowEvent v1 Contract

Each event includes:

```rust
pub struct FlowEvent {
    pub spec_version: String,      // "caudal.v1"
    pub event_id: Uuid,             // Unique event ID (tracing)
    pub flow_id: String,           // Flow ID (logical unit)
    pub group_flow: Option<String>, // Logical grouping (realtime/UI)
    pub node_id: String,            // Emitting node ID
    pub node_type: NodeType,        // Producer, Consumer, Bridge, etc.
    pub timestamp: DateTime<Utc>,   // UTC Timestamp
    pub stage: FlowStage,           // Ingress, Emit, Process, etc.
    pub status: FlowStatus,         // Sent, Received, Ack, etc.
    pub ack: AckInfo,               // Acknowledgment information
    pub metadata: EventMetadata,    // Structured metadata (see below)
    pub error: Option<FlowError>,   // Error information
}

pub struct EventMetadata {
    pub event_id: Option<Uuid>,      // Event ID (redundant)
    pub bytes: Option<String>,       // Legacy bytes (string)
    pub payload_bytes: Option<u64>,  // Logical size
    pub input_bytes: Option<u64>,    // Input bytes
    pub output_bytes: Option<u64>,   // Output bytes
    pub extra: HashMap<String, String>, // Other dynamic fields
}
```

### Enums

- **NodeType**: `Producer`, `Bridge`, `Consumer`, `Storage`, `External`
- **FlowStage**: `Ingress`, `Emit`, `Bridge`, `Consume`, `Process`, `Persist`, `Egress`, `Error`
- **FlowStatus**: `Sent`, `Received`, `Ack`, `Nack`, `Timeout`, `Skipped`
- **AckType**: `None`, `Implicit`, `Explicit`, `Bridge`

## Canonical Event Pattern

Applications **SHOULD**:

- **Define domain events explicitly**: Model your business events clearly before emitting flow events.
- **Emit a single "processed" event per flow**: Each flow should have one definitive completion event.
- **Emit explicit ACK events for external systems**: When integrating with external services, emit explicit acknowledgment events.
- **Correlate using flow_id**: Use the same `flow_id` across all events in a single flow for proper correlation.

### Example Flow

```rust
let flow_id = "order-processing";
let group = "siscom";

// 1. Ingress event
let ingress = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("api-gateway", NodeType::Producer)
    .stage(FlowStage::Ingress)
    .status(FlowStatus::Received)
    .build()?;

// 2. Processing event
let processing = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("order-service", NodeType::Consumer)
    .stage(FlowStage::Process)
    .status(FlowStatus::Received)
    .build()?;

// 3. External ACK event
let ack = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("payment-gateway", NodeType::External)
    .stage(FlowStage::Egress)
    .status(FlowStatus::Ack)
    .ack(AckType::Explicit)
    .build()?;

// 4. Final "processed" event
let processed = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("order-service", NodeType::Consumer)
    .stage(FlowStage::Process)
    .status(FlowStatus::Ack)
    .meta("order_id", "12345")
    .build()?;
```

## Implementing a Transport

```rust
use caudal_sdk::{CaudalTransport, FlowEvent};
use anyhow::Result;

struct MyTransport {
    // Your configuration
}

impl CaudalTransport for MyTransport {
    fn emit(&self, event: FlowEvent) -> Result<()> {
        // Implement event emission
        // For example, to Redis, HTTP, etc.
        Ok(())
    }
}
```

## Kafka Configuration

```rust
let config = KafkaConfig {
    bootstrap_servers: "localhost:9092".to_string(),
    topic: "caudal-events".to_string(),
    acks: "all".to_string(),        // "all", "1", "0"
    linger_ms: Some(10),            // Batching delay
    batch_size: Some(16384),        // Batch size in bytes
    timeout_ms: 5000,               // Timeout for delivery
};
```

### Acks Configuration

- `"all"` or `"-1"`: Maximum durability (waits for all replicas)
- `"1"`: Waits only for the leader
- `"0"`: Fire-and-forget (no acknowledgment)

## Guarantees and Limitations

### ✅ Guarantees

- **Non-blocking**: BufferedEmitter never blocks the production thread.
- **Production-safe**: Manageable errors, never panics.
- **Idempotent**: Safe for retries.
- **Thread-safe**: All components are Send + Sync.

### ⚠️ Limitations

- **Best-effort**: Does not guarantee delivery (it is not a broker).
- **Non-transactional**: Does not participate in distributed transactions.
- **Metadata only**: Does not send business payloads.
- **Append-only**: Does not modify or delete events.

## Examples

See the `examples/` directory for full examples:

```bash
# Basic example
cargo run --example basic_sync --features serde_json

# BufferedEmitter
cargo run --example buffered

# Kafka producer (requires local Kafka)
cargo run --example kafka_producer --features kafka
```

## Testing

```bash
# Unit tests
cargo test

# Tests with all features
cargo test --all-features

# Clippy
cargo clippy --all-features -- -D warnings

# Format
cargo fmt -- --check
```

## Roadmap

- [x] FlowEvent v1 Contract
- [x] Ergonomic Builder
- [x] Abstract Transport (traits)
- [x] Non-blocking BufferedEmitter
- [x] Kafka/Redpanda Implementation
- [x] async/await Support
- [x] JSON Serialization
- [x] Unit Tests
- [ ] Configurable retry policies
- [ ] Circuit breaker
- [ ] Prometheus metrics
- [ ] Intelligent batching
- [ ] Event compression

> [!NOTE]
> Future versions may provide helper builders for common patterns,
> once enough real-world usage patterns are validated.

## License

MIT OR Apache-2.0

## Contributing

Contributions are welcome! Please open an issue or PR.

## Support

- 📖 [Documentación]https://docs.rs/caudal-sdk
- 🐛 [Issues]https://github.com/JesusCabreraReveles/caudal-sdk-rs/issues
- 💬 [Discussions]https://github.com/JesusCabreraReveles/caudal-sdk-rs/discussions