caudal-sdk 0.1.0

Caudal SDK - Event observability for distributed systems
Documentation

Caudal SDK (Rust)

Crates.io Documentation License

Observability SDK for event-driven distributed systems.

What is Caudal?

Caudal is an observability system for event flows. It is NOT a broker, it DOES NOT participate in the critical path, and it DOES NOT guarantee delivery. It only observes, records, and correlates events describing how a message flows through a distributed system.

Features

  • Append-only: Only records events, never modifies them.
  • Event-only: Does not transport business payloads.
  • Non-blocking: BufferedEmitter for non-blocking emission.
  • Idempotent: Safe for retries.
  • Production-safe: Never breaks the system using it.
  • Transport-agnostic: Decoupled through traits.
  • Kafka/Redpanda: Optional implementation via feature flag.
  • Async support: Support for async/await.
  • Metrics: Internal emission metrics.

Installation

Add this to your Cargo.toml:

[dependencies]
caudal-sdk = "0.1.0"

# With JSON serialization
caudal-sdk = { version = "0.1.0", features = ["serde_json"] }

# With Kafka support
caudal-sdk = { version = "0.1.0", features = ["kafka"] }

# With async support
caudal-sdk = { version = "0.1.0", features = ["async-runtime"] }

# All features
caudal-sdk = { version = "0.1.0", features = ["serde_json", "kafka", "async-runtime"] }

Basic Usage

Creating and Emitting Events

use caudal_sdk::{
    FlowEventBuilder, CaudalEmitter,
    NodeType, FlowStage, FlowStatus, AckType,
};

// Create an event (flow_id is required and represents the logical unit)
let event = FlowEventBuilder::new("siscom-consumer")
    .group_flow("siscom") // Optional: grouping for Realtime/UI
    .node("api-gateway", NodeType::Producer)
    .stage(FlowStage::Emit)
    .status(FlowStatus::Sent)
    .ack(AckType::Implicit)
    .payload_bytes(190)   // Optional: Logical size
    .input_bytes(190)    // Optional: Input bytes
    .output_bytes(210)   // Optional: Output bytes
    .meta("endpoint", "/api/users")
    .build()?;

// event_id is generated automatically as UUID v4 if not specified

// Emit with custom transport
let emitter = CaudalEmitter::new(my_transport);
emitter.emit_strict(event)?;

// Or best-effort mode (never fails)
emitter.emit_best_effort(event);

BufferedEmitter (Non-Blocking)

use caudal_sdk::BufferedEmitter;
use caudal_sdk::emitter::buffered::{BufferedConfig, DropPolicy};

let config = BufferedConfig {
    buffer_size: 1000,
    drop_policy: DropPolicy::DropNewest,
    flush_interval: None,
};

let emitter = BufferedEmitter::new(transport, config);

// Emit without blocking (never fails)
emitter.emit_best_effort(event);

// Get metrics
let metrics = emitter.metrics();
println!("Sent: {}, Dropped: {}", metrics.events_sent, metrics.events_dropped);

Kafka Transport

use caudal_sdk::transport::kafka::{KafkaTransport, KafkaConfig};

let config = KafkaConfig {
    bootstrap_servers: "localhost:9092".to_string(),
    topic: "caudal-events".to_string(),
    acks: "all".to_string(),
    ..Default::default()
};

let transport = KafkaTransport::new(config)?;
let emitter = CaudalEmitter::new(transport);

Features

Feature Description Dependencies
serde_json JSON serialization of events serde_json
kafka Kafka/Redpanda transport rdkafka, tokio
async-runtime async/await support async-trait, tokio
tracing Integration with tracing tracing

FlowEvent v1 Contract

Each event includes:

pub struct FlowEvent {
    pub spec_version: String,      // "caudal.v1"
    pub event_id: Uuid,             // Unique event ID (tracing)
    pub flow_id: String,           // Flow ID (logical unit)
    pub group_flow: Option<String>, // Logical grouping (realtime/UI)
    pub node_id: String,            // Emitting node ID
    pub node_type: NodeType,        // Producer, Consumer, Bridge, etc.
    pub timestamp: DateTime<Utc>,   // UTC Timestamp
    pub stage: FlowStage,           // Ingress, Emit, Process, etc.
    pub status: FlowStatus,         // Sent, Received, Ack, etc.
    pub ack: AckInfo,               // Acknowledgment information
    pub metadata: EventMetadata,    // Structured metadata (see below)
    pub error: Option<FlowError>,   // Error information
}

pub struct EventMetadata {
    pub event_id: Option<Uuid>,      // Event ID (redundant)
    pub bytes: Option<String>,       // Legacy bytes (string)
    pub payload_bytes: Option<u64>,  // Logical size
    pub input_bytes: Option<u64>,    // Input bytes
    pub output_bytes: Option<u64>,   // Output bytes
    pub extra: HashMap<String, String>, // Other dynamic fields
}

Enums

  • NodeType: Producer, Bridge, Consumer, Storage, External
  • FlowStage: Ingress, Emit, Bridge, Consume, Process, Persist, Egress, Error
  • FlowStatus: Sent, Received, Ack, Nack, Timeout, Skipped
  • AckType: None, Implicit, Explicit, Bridge

Canonical Event Pattern

Applications SHOULD:

  • Define domain events explicitly: Model your business events clearly before emitting flow events.
  • Emit a single "processed" event per flow: Each flow should have one definitive completion event.
  • Emit explicit ACK events for external systems: When integrating with external services, emit explicit acknowledgment events.
  • Correlate using flow_id: Use the same flow_id across all events in a single flow for proper correlation.

Example Flow

let flow_id = "order-processing";
let group = "siscom";

// 1. Ingress event
let ingress = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("api-gateway", NodeType::Producer)
    .stage(FlowStage::Ingress)
    .status(FlowStatus::Received)
    .build()?;

// 2. Processing event
let processing = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("order-service", NodeType::Consumer)
    .stage(FlowStage::Process)
    .status(FlowStatus::Received)
    .build()?;

// 3. External ACK event
let ack = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("payment-gateway", NodeType::External)
    .stage(FlowStage::Egress)
    .status(FlowStatus::Ack)
    .ack(AckType::Explicit)
    .build()?;

// 4. Final "processed" event
let processed = FlowEventBuilder::new(flow_id)
    .group_flow(group)
    .node("order-service", NodeType::Consumer)
    .stage(FlowStage::Process)
    .status(FlowStatus::Ack)
    .meta("order_id", "12345")
    .build()?;

Implementing a Transport

use caudal_sdk::{CaudalTransport, FlowEvent};
use anyhow::Result;

struct MyTransport {
    // Your configuration
}

impl CaudalTransport for MyTransport {
    fn emit(&self, event: FlowEvent) -> Result<()> {
        // Implement event emission
        // For example, to Redis, HTTP, etc.
        Ok(())
    }
}

Kafka Configuration

let config = KafkaConfig {
    bootstrap_servers: "localhost:9092".to_string(),
    topic: "caudal-events".to_string(),
    acks: "all".to_string(),        // "all", "1", "0"
    linger_ms: Some(10),            // Batching delay
    batch_size: Some(16384),        // Batch size in bytes
    timeout_ms: 5000,               // Timeout for delivery
};

Acks Configuration

  • "all" or "-1": Maximum durability (waits for all replicas)
  • "1": Waits only for the leader
  • "0": Fire-and-forget (no acknowledgment)

Guarantees and Limitations

✅ Guarantees

  • Non-blocking: BufferedEmitter never blocks the production thread.
  • Production-safe: Manageable errors, never panics.
  • Idempotent: Safe for retries.
  • Thread-safe: All components are Send + Sync.

⚠️ Limitations

  • Best-effort: Does not guarantee delivery (it is not a broker).
  • Non-transactional: Does not participate in distributed transactions.
  • Metadata only: Does not send business payloads.
  • Append-only: Does not modify or delete events.

Examples

See the examples/ directory for full examples:

# Basic example
cargo run --example basic_sync --features serde_json

# BufferedEmitter
cargo run --example buffered

# Kafka producer (requires local Kafka)
cargo run --example kafka_producer --features kafka

Testing

# Unit tests
cargo test

# Tests with all features
cargo test --all-features

# Clippy
cargo clippy --all-features -- -D warnings

# Format
cargo fmt -- --check

Roadmap

  • FlowEvent v1 Contract
  • Ergonomic Builder
  • Abstract Transport (traits)
  • Non-blocking BufferedEmitter
  • Kafka/Redpanda Implementation
  • async/await Support
  • JSON Serialization
  • Unit Tests
  • Configurable retry policies
  • Circuit breaker
  • Prometheus metrics
  • Intelligent batching
  • Event compression

[!NOTE] Future versions may provide helper builders for common patterns, once enough real-world usage patterns are validated.

License

MIT OR Apache-2.0

Contributing

Contributions are welcome! Please open an issue or PR.

Support