allsource-core 0.7.3

High-performance event store core built in Rust
Documentation

AllSource Core - High-Performance Event Store

AI-native event store built in Rust with columnar storage, schema validation, event replay, and stream processing

crates.io docs.rs Rust Tests Performance Architecture License

๐ŸŽฏ What is AllSource?

AllSource is a high-performance event store designed for modern event-sourcing and CQRS architectures. Built with a polyglot architecture:

  • Rust Core (this service): High-performance event store engine with columnar storage
  • Go Control Plane (services/control-plane): Orchestration, monitoring, and management layer

The Rust core provides blazing-fast event ingestion (469K events/sec) and sub-microsecond queries, while the Go control plane handles cluster coordination and operational tasks.

Current Version: v0.7.1 ยท crates.io ยท docs.rs

Installation

allsource-core is distributed as a library crate via crates.io.

Add to Your Project

cargo add allsource-core@0.7

Or add to your Cargo.toml (pin to minor version for stability):

[dependencies]
# allsource-core: High-performance event store
# Pin to minor version - allows patch updates only
allsource-core = "0.7"

Version Pinning Best Practice: We recommend "0.7" (minor version) rather than "0.7.1" (exact) or "0" (major only). This allows automatic patch updates while avoiding breaking changes.

Usage Patterns

  1. Embedded Library - Import directly into your Rust application
  2. Standalone Server - Build your own binary using the library
  3. Serverless - Deploy in AWS Lambda or similar environments

โœจ Features

๐Ÿš€ Core Event Store (v0.1)

  • Immutable Event Log: Append-only storage with complete audit trail
  • Time-Travel Queries: Query entity state as of any timestamp
  • Concurrent Indexing: Lock-free indexing using DashMap for O(1) lookups
  • Real-time Projections: Built-in materialized views with custom projection support
  • High Performance: 469K+ events/sec throughput, sub-millisecond queries
  • Type-Safe API: Strong typing with Rust's ownership system

๐Ÿ’พ Persistence & Durability (v0.2)

  • Parquet Columnar Storage: Apache Arrow-based storage for analytics
  • Write-Ahead Log (WAL): Crash recovery with full durability guarantees
  • Snapshot System: Point-in-time snapshots with automatic optimization
  • Automatic Compaction: Background file merging for storage efficiency
  • WebSocket Streaming: Real-time event broadcasting to connected clients
  • Advanced Analytics: Event frequency, correlation analysis, statistical summaries

๐Ÿ“‹ Schema Registry (v0.5)

  • JSON Schema Validation: Enforce event contracts at ingestion time
  • Schema Versioning: Automatic version management with compatibility checking
  • Compatibility Modes: Backward, Forward, Full, or None
  • Breaking Change Prevention: Validate schema evolution before deployment
  • Subject Organization: Group schemas by domain or event type

๐Ÿ”„ Event Replay & Projections (v0.5)

  • Point-in-Time Replay: Replay events from any timestamp
  • Projection Rebuilding: Reconstruct materialized views with progress tracking
  • Batch Processing: Configurable batch sizes for optimal performance
  • Async Execution: Non-blocking background replay operations
  • Cancellable Operations: Stop replays gracefully with proper cleanup
  • Progress Metrics: Real-time statistics (events/sec, percentage complete)

๐Ÿ” Security & Multi-tenancy (v0.7)

  • Authentication: JWT tokens with RBAC, API keys for services
  • Authorization: Role-based access control (Admin, Operator, Reader)
  • Rate Limiting: Per-tenant configurable limits
  • IP Filtering: Global allowlist/blocklist support
  • Audit Logging: Comprehensive audit trail with PostgreSQL support
  • Tenant Isolation: Repository-level isolation with quotas
  • Serverless Ready: Graceful shutdown, PORT env var, optimized containers

โšก Stream Processing (v0.5)

  • 6 Built-in Operators:
    • Filter: eq, ne, gt, lt, contains operations
    • Map: Transform field values (uppercase, lowercase, trim, math)
    • Reduce: Aggregations (count, sum, avg, min, max) with grouping
    • Window: Time-based aggregations (tumbling, sliding, session)
    • Enrich: External data lookup and enrichment
    • Branch: Conditional event routing
  • Stateful Processing: Thread-safe state management for aggregations
  • Window Buffers: Automatic time-based event eviction
  • Pipeline Statistics: Track processing metrics per pipeline
  • Integrated Processing: Events flow through pipelines during ingestion

๐Ÿ—๏ธ Clean Architecture

The codebase follows a strict layered architecture for maintainability and testability:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Infrastructure Layer                     โ”‚
โ”‚  (HTTP handlers, WebSocket, persistence, security)          โ”‚
โ”‚  infrastructure::web, infrastructure::persistence,          โ”‚
โ”‚  infrastructure::security, infrastructure::repositories     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                    Application Layer                        โ”‚
โ”‚  (Use cases, services, DTOs)                                โ”‚
โ”‚  application::use_cases, application::services,             โ”‚
โ”‚  application::dto                                           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                      Domain Layer                           โ”‚
โ”‚  (Entities, value objects, repository traits)               โ”‚
โ”‚  domain::entities, domain::value_objects,                   โ”‚
โ”‚  domain::repositories                                       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Module Organization:

Layer Path Contents
Domain domain::entities Event, Tenant, Schema, Projection, AuditEvent, EventStream
Domain domain::value_objects EntityId, TenantId, EventType, PartitionKey
Domain domain::repositories Repository trait definitions (no implementations)
Application application::use_cases IngestEvent, QueryEvents, ManageTenant, ManageSchema
Application application::services AuditLogger, Pipeline, Replay, Analytics
Application application::dto Request/Response DTOs for API boundaries
Infrastructure infrastructure::web HTTP handlers, WebSocket, API routes
Infrastructure infrastructure::persistence Storage, WAL, Snapshots, Compaction, Index
Infrastructure infrastructure::security Auth, Middleware, Rate Limiting
Infrastructure infrastructure::repositories In-memory, PostgreSQL, RocksDB implementations

Dependency Rule: Inner layers never depend on outer layers. Domain has zero external dependencies.

๐Ÿ”๏ธ SierraDB-Inspired Production Patterns (NEW)

Based on battle-tested patterns from SierraDB, a production-grade event store:

PartitionKey - Fixed Partition Architecture

  • โœ… 32 fixed partitions for single-node deployment (scalable to 1024+)
  • โœ… Consistent hashing ensures same entity always maps to same partition
  • โœ… Sequential writes within partitions for ordering guarantees
  • โœ… Ready for horizontal scaling with node assignment
  • โœ… 6 comprehensive tests covering distribution and consistency

EventStream - Gapless Version Guarantees

  • โœ… Watermark system tracks "highest continuously confirmed sequence"
  • โœ… Prevents gaps that would break event sourcing guarantees
  • โœ… Optimistic locking prevents concurrent modification conflicts
  • โœ… Version numbers start at 1 and increment sequentially
  • โœ… 9 tests covering versioning, concurrency, and gap detection

EventStreamRepository - Infrastructure Implementation

  • โœ… Thread-safe in-memory implementation with parking_lot RwLock
  • โœ… Partition-aware stream storage and retrieval
  • โœ… Watermark tracking and gapless verification
  • โœ… Optimistic locking enforcement at repository level
  • โœ… 8 comprehensive tests covering all operations

StorageIntegrity - Corruption Prevention

  • โœ… SHA-256 checksums for data integrity
  • โœ… WAL segment verification
  • โœ… Parquet file integrity checking
  • โœ… Batch verification with progress reporting
  • โœ… 8 tests covering checksums and file verification

7-Day Stress Tests - Production Hardening

  • โœ… Continuous ingestion tests (7 days, 1 hour, 5 minutes configs)
  • โœ… Memory leak detection
  • โœ… Corruption detection over time
  • โœ… Partition balance monitoring
  • โœ… 4 tests + configurable long-running tests

Why These Patterns?

Pattern SierraDB's Lesson Our Benefit
Fixed Partitions Sequential writes enable gapless sequences Horizontal scaling without complex coordination
Gapless Versions Watermark prevents data gaps Consistent event sourcing guarantees
Optimistic Locking Detect concurrent modifications Safe concurrent access without heavy locks

Implemented:

  • โœ… Storage integrity checksums (prevent silent corruption)
  • โœ… 7-day continuous stress tests (production confidence)

Coming Next:

  • โšก Zero-copy deserialization (performance optimization)
  • ๐Ÿ”„ Batch processing optimizations
  • ๐Ÿ”’ Lock-free data structures

๐Ÿ“Š Performance Benchmarks

Measured on Apple Silicon M-series (release build):

Operation Throughput/Latency Details
Event Ingestion 442-469K events/sec Single-threaded
Entity Query 11.9 ฮผs Indexed lookup
Type Query 2.4 ms Cross-entity scan
State Reconstruction 3.5 ฮผs With snapshots
State Reconstruction 3.8 ฮผs Without snapshots
Concurrent Writes (8 workers) 8.0 ms/batch 100 events/batch
Parquet Batch Write 3.5 ms 1000 events
Snapshot Creation 130 ฮผs Per entity
WAL Sync Writes 413 ms 100 syncs

Test Coverage: 250 tests - 100% passing

  • Domain Layer: 177 tests (Value Objects, Entities, Business Rules, SierraDB Patterns)
    • PartitionKey: 6 tests (consistent hashing, distribution, node assignment)
    • EventStream: 9 tests (gapless versioning, optimistic locking, watermarks)
  • Application Layer: 20 tests (Use Cases, DTOs)
  • Infrastructure Layer: 53 tests (API, Storage, Services, Repository Implementations)
    • InMemoryEventStreamRepository: 8 tests (SierraDB pattern implementation)
    • StorageIntegrity: 8 tests (checksum verification, WAL/Parquet integrity)
    • 7-Day Stress Tests: 4 tests (long-running corruption detection)

๐Ÿ”ง API Endpoints (38 Total)

Core Event Store

# Health check
GET /health

# Ingest event
POST /api/v1/events

# Query events
GET /api/v1/events/query?entity_id=user-123
GET /api/v1/events/query?event_type=user.created
GET /api/v1/events/query?since=2024-01-15T00:00:00Z&limit=100

# Entity state
GET /api/v1/entities/:entity_id/state
GET /api/v1/entities/:entity_id/state?as_of=2024-01-15T10:00:00Z
GET /api/v1/entities/:entity_id/snapshot

# Statistics
GET /api/v1/stats

WebSocket Streaming (v0.2)

# Real-time event stream
WS /api/v1/events/stream

Analytics (v0.2)

# Event frequency analysis
GET /api/v1/analytics/frequency?event_type=user.created&bucket_size=3600

# Statistical summary
GET /api/v1/analytics/summary?entity_id=user-123

# Event correlation
GET /api/v1/analytics/correlation?event_a=user.created&event_b=order.placed

Snapshots (v0.2)

# Create snapshot
POST /api/v1/snapshots

# List snapshots
GET /api/v1/snapshots
GET /api/v1/snapshots?entity_id=user-123

# Get latest snapshot
GET /api/v1/snapshots/:entity_id/latest

Compaction (v0.2)

# Trigger manual compaction
POST /api/v1/compaction/trigger

# Get compaction stats
GET /api/v1/compaction/stats

Schema Registry (v0.5)

# Register schema
POST /api/v1/schemas

# List subjects
GET /api/v1/schemas

# Get schema
GET /api/v1/schemas/:subject
GET /api/v1/schemas/:subject?version=2

# List versions
GET /api/v1/schemas/:subject/versions

# Validate event
POST /api/v1/schemas/validate

# Set compatibility mode
PUT /api/v1/schemas/:subject/compatibility

Event Replay (v0.5)

# Start replay
POST /api/v1/replay

# List replays
GET /api/v1/replay

# Get progress
GET /api/v1/replay/:replay_id

# Cancel replay
POST /api/v1/replay/:replay_id/cancel

# Delete replay
DELETE /api/v1/replay/:replay_id

Stream Processing Pipelines (v0.5)

# Register pipeline
POST /api/v1/pipelines

# List pipelines
GET /api/v1/pipelines

# Get pipeline
GET /api/v1/pipelines/:pipeline_id

# Remove pipeline
DELETE /api/v1/pipelines/:pipeline_id

# Get all stats
GET /api/v1/pipelines/stats

# Get pipeline stats
GET /api/v1/pipelines/:pipeline_id/stats

# Reset pipeline state
PUT /api/v1/pipelines/:pipeline_id/reset

๐Ÿ“ Project Structure (Clean Architecture)

Following Clean Architecture principles with clear separation of concerns:

services/core/src/
โ”œโ”€โ”€ main.rs                    # Application entry point
โ”œโ”€โ”€ lib.rs                     # Library exports
โ”œโ”€โ”€ error.rs                   # Error types and Result
โ”‚
โ”œโ”€โ”€ domain/                    # ๐Ÿ›๏ธ DOMAIN LAYER (Business Logic)
โ”‚   โ”œโ”€โ”€ entities/              # Core business entities
โ”‚   โ”‚   โ”œโ”€โ”€ event.rs          # Event entity (162 tests)
โ”‚   โ”‚   โ”œโ”€โ”€ tenant.rs         # Multi-tenancy entity
โ”‚   โ”‚   โ”œโ”€โ”€ schema.rs         # Schema registry entity
โ”‚   โ”‚   โ”œโ”€โ”€ projection.rs     # Projection entity
โ”‚   โ”‚   โ””โ”€โ”€ event_stream.rs   # ๐Ÿ†• Gapless event stream (9 tests)
โ”‚   โ””โ”€โ”€ value_objects/         # Self-validating value objects
โ”‚       โ”œโ”€โ”€ tenant_id.rs      # Tenant identifier
โ”‚       โ”œโ”€โ”€ event_type.rs     # Event type validation
โ”‚       โ”œโ”€โ”€ entity_id.rs      # Entity identifier
โ”‚       โ””โ”€โ”€ partition_key.rs  # ๐Ÿ†• Fixed partitioning (6 tests)
โ”‚
โ”œโ”€โ”€ application/               # ๐ŸŽฏ APPLICATION LAYER (Use Cases)
โ”‚   โ”œโ”€โ”€ dto/                   # Data Transfer Objects
โ”‚   โ”‚   โ”œโ”€โ”€ event_dto.rs      # Event request/response DTOs
โ”‚   โ”‚   โ”œโ”€โ”€ tenant_dto.rs     # Tenant DTOs
โ”‚   โ”‚   โ”œโ”€โ”€ schema_dto.rs     # Schema DTOs
โ”‚   โ”‚   โ””โ”€โ”€ projection_dto.rs # Projection DTOs
โ”‚   โ””โ”€โ”€ use_cases/             # Application business logic
โ”‚       โ”œโ”€โ”€ ingest_event.rs   # Event ingestion (3 tests)
โ”‚       โ”œโ”€โ”€ query_events.rs   # Event queries (4 tests)
โ”‚       โ”œโ”€โ”€ manage_tenant.rs  # Tenant management (5 tests)
โ”‚       โ”œโ”€โ”€ manage_schema.rs  # Schema operations (4 tests)
โ”‚       โ””โ”€โ”€ manage_projection.rs # Projection ops (4 tests)
โ”‚
โ””โ”€โ”€ infrastructure/            # ๐Ÿ”ง INFRASTRUCTURE LAYER (Technical)
    โ”œโ”€โ”€ repositories/         # ๐Ÿ†• Repository implementations (SierraDB)
    โ”‚   โ””โ”€โ”€ in_memory_event_stream_repository.rs  # Thread-safe, partitioned (8 tests)
    โ”œโ”€โ”€ persistence/          # ๐Ÿ†• Storage integrity (SierraDB)
    โ”‚   โ””โ”€โ”€ storage_integrity.rs  # Checksum verification (8 tests)
    โ”œโ”€โ”€ api.rs                # REST API endpoints (38 endpoints)
    โ”œโ”€โ”€ store.rs              # Event store implementation
    โ”œโ”€โ”€ storage.rs            # Parquet columnar storage
    โ”œโ”€โ”€ wal.rs                # Write-ahead log
    โ”œโ”€โ”€ snapshot.rs           # Snapshot management
    โ”œโ”€โ”€ compaction.rs         # Storage compaction
    โ”œโ”€โ”€ index.rs              # High-performance indexing
    โ”œโ”€โ”€ projection.rs         # Real-time projections
    โ”œโ”€โ”€ analytics.rs          # Analytics engine
    โ”œโ”€โ”€ websocket.rs          # WebSocket streaming
    โ”œโ”€โ”€ schema.rs             # Schema validation service
    โ”œโ”€โ”€ replay.rs             # Event replay engine
    โ”œโ”€โ”€ pipeline.rs           # Stream processing
    โ”œโ”€โ”€ backup.rs             # Backup management
    โ”œโ”€โ”€ auth.rs               # Authentication/Authorization
    โ”œโ”€โ”€ rate_limit.rs         # Rate limiting
    โ”œโ”€โ”€ tenant.rs             # Tenant service
    โ”œโ”€โ”€ metrics.rs            # Metrics collection
    โ”œโ”€โ”€ middleware.rs         # HTTP middleware
    โ”œโ”€โ”€ config.rs             # Configuration
    โ”œโ”€โ”€ tenant_api.rs         # Tenant API handlers
    โ””โ”€โ”€ auth_api.rs           # Auth API handlers

tests/
โ”œโ”€โ”€ integration_tests.rs      # End-to-end tests
โ””โ”€โ”€ stress_tests/             # ๐Ÿ†• Long-running stress tests (SierraDB)
    โ””โ”€โ”€ seven_day_stress.rs   # 7-day corruption detection (4 tests)

benches/
โ””โ”€โ”€ performance_benchmarks.rs # Performance benchmarks

๐Ÿ—๏ธ Clean Architecture Benefits

Domain Layer (Core Business Logic)

  • โœ… Pure business rules with zero external dependencies
  • โœ… Value Objects enforce invariants at construction time
  • โœ… Entities contain rich domain behavior
  • โœ… SierraDB patterns for production-grade event streaming
  • โœ… 177 comprehensive domain tests (including 15 SierraDB tests)

Application Layer (Orchestration)

  • โœ… Use Cases coordinate domain entities
  • โœ… DTOs isolate external contracts from domain
  • โœ… Clear input/output boundaries
  • โœ… 20 use case tests covering all scenarios

Infrastructure Layer (Technical Details)

  • โœ… Pluggable implementations (can swap storage, APIs)
  • โœ… Framework and library dependencies isolated
  • โœ… Domain and Application layers remain pure
  • โœ… 37 infrastructure integration tests

๐Ÿš€ Quick Start

Option 1: Use as a Library (Recommended)

Add to your Cargo.toml:

[dependencies]
allsource-core = "0.7"  # Pin to minor version

Or:

cargo add allsource-core@0.7

Option 2: Build from Source

# Clone the repository
git clone https://github.com/all-source-os/chronos-monorepo.git
cd chronos-monorepo/apps/core

# Build
cargo build --release

# Run tests
cargo test

# Run benchmarks
cargo bench

Running the Server

# Development mode
cargo run

# Production mode (optimized)
cargo run --release

# With debug logging
RUST_LOG=debug cargo run

# Custom port (modify main.rs)
# Default: 0.0.0.0:8080

Example: Ingest Events

curl -X POST http://localhost:3900/api/v1/events \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "user.created",
    "entity_id": "user-123",
    "payload": {
      "name": "Alice",
      "email": "alice@example.com"
    }
  }'

Example: Query Events

# Get all events for an entity
curl "http://localhost:3900/api/v1/events/query?entity_id=user-123"

# Time-travel query
curl "http://localhost:3900/api/v1/events/query?entity_id=user-123&as_of=2024-01-15T10:00:00Z"

# Query by type
curl "http://localhost:3900/api/v1/events/query?event_type=user.created&limit=10"

Example: Register Schema (v0.5)

curl -X POST http://localhost:3900/api/v1/schemas \
  -H "Content-Type: application/json" \
  -d '{
    "subject": "user.created",
    "schema": {
      "type": "object",
      "required": ["name", "email"],
      "properties": {
        "name": {"type": "string"},
        "email": {"type": "string", "format": "email"}
      }
    }
  }'

Example: Start Replay (v0.5)

curl -X POST http://localhost:3900/api/v1/replay \
  -H "Content-Type: application/json" \
  -d '{
    "projection_name": "user_snapshot",
    "from_timestamp": "2024-01-01T00:00:00Z",
    "config": {
      "batch_size": 1000,
      "emit_progress": true
    }
  }'

Example: Create Pipeline (v0.5)

curl -X POST http://localhost:3900/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "user_analytics",
    "source_event_types": ["user.created", "user.updated"],
    "operators": [
      {
        "type": "filter",
        "field": "country",
        "value": "US",
        "op": "eq"
      },
      {
        "type": "reduce",
        "field": "id",
        "function": "count",
        "group_by": "country"
      }
    ],
    "enabled": true
  }'

โœ… Quality Gates

AllSource Core enforces strict quality gates to maintain code quality, consistency, and reliability.

See: QUALITY_GATES.md for complete documentation.

Quick Start

# Run all quality gates (recommended before commit)
make check

# Auto-fix formatting and sorting
make format
make format-sort

# Full CI pipeline locally
make ci

Quality Checks Enforced

  • โœ… Code Formatting (rustfmt) - Consistent style across codebase
  • โœ… Code Quality (clippy) - Zero warnings, catch anti-patterns
  • โœ… Dependency Sorting (cargo-sort) - Alphabetically sorted Cargo.toml
  • โœ… Test Coverage - All tests must pass
  • โœ… Build Verification - Release build must succeed

CI/CD Integration

Quality gates run automatically on:

  • Every push to main or develop
  • Every pull request
  • Enforced before merge

Workflow: .github/workflows/rust-quality.yml

Configuration Files

  • .clippy.toml - Clippy linter configuration (MSRV: 1.70.0)
  • rustfmt.toml - Code formatting rules (max width: 100)
  • cargo-sort.toml - Dependency sorting configuration
  • Makefile - Quality gate commands

๐Ÿงช Testing

# Run all tests
cargo test

# Run unit tests only
cargo test --lib

# Run integration tests
cargo test --test integration_tests

# Run specific test
cargo test test_replay_progress_tracking

# Run with output
cargo test -- --nocapture

# Run with logging
RUST_LOG=debug cargo test

๐Ÿ“Š Benchmarking

# Run all benchmarks
cargo bench

# Run specific benchmark suite
cargo bench ingestion_throughput
cargo bench query_performance
cargo bench state_reconstruction
cargo bench concurrent_writes

# View results
open target/criterion/report/index.html

๐Ÿ—๏ธ Architecture

System Architecture

AllSource uses a polyglot architecture with specialized services:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Go Control Plane (Port 8081)         โ”‚
โ”‚   โ€ข Cluster Management                  โ”‚
โ”‚   โ€ข Metrics Aggregation                 โ”‚
โ”‚   โ€ข Operation Orchestration             โ”‚
โ”‚   โ€ข Health Monitoring                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
              โ”‚ HTTP Client
              โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Rust Event Store (Port 8080)         โ”‚
โ”‚   โ€ข Event Ingestion (469K/sec)          โ”‚
โ”‚   โ€ข Query Engine (<12ฮผs)                โ”‚
โ”‚   โ€ข Schema Registry                     โ”‚
โ”‚   โ€ข Stream Processing                   โ”‚
โ”‚   โ€ข Event Replay                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
              โ”‚
              โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Storage Layer                         โ”‚
โ”‚   โ€ข Parquet (Columnar)                  โ”‚
โ”‚   โ€ข WAL (Durability)                    โ”‚
โ”‚   โ€ข Snapshots (Point-in-time)           โ”‚
โ”‚   โ€ข In-Memory Indexes                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Event Flow

When an event is ingested:

  1. Validation - Check event integrity
  2. WAL Write - Durable write-ahead log (v0.2)
  3. Indexing - Update entity/type indexes
  4. Projections - Update materialized views
  5. Pipelines - Real-time stream processing (v0.5)
  6. Parquet Storage - Columnar persistence (v0.2)
  7. In-Memory Store - Fast access layer
  8. WebSocket Broadcast - Real-time streaming (v0.2)
  9. Auto-Snapshots - Create snapshots if needed (v0.2)

Storage Architecture

Storage Layer:
โ”œโ”€โ”€ In-Memory Events (Vec<Event>)
โ”œโ”€โ”€ DashMap Indexes (entity_id, event_type)
โ”œโ”€โ”€ Parquet Files (columnar storage)
โ”œโ”€โ”€ WAL Segments (append-only logs)
โ””โ”€โ”€ Snapshots (point-in-time state)

Concurrency Model

  • Lock-free Indexes: DashMap for entity/type lookups
  • RwLock: parking_lot RwLock for event list
  • Atomic Counters: Lock-free statistics tracking
  • Async Runtime: Tokio for background tasks (replay, compaction)

๐ŸŽฏ Roadmap

โœ… v0.1 - Core Event Store (COMPLETED)

  • In-memory event storage
  • DashMap-based indexing
  • Entity state reconstruction
  • Basic projections
  • REST API
  • Query by entity/type/time

โœ… v0.2 - Persistence & Durability (COMPLETED)

  • Parquet columnar storage
  • Write-ahead log (WAL)
  • Snapshot system
  • Automatic compaction
  • WebSocket streaming
  • Advanced analytics

โœ… v0.5 - Schema & Processing (COMPLETED)

  • Schema registry with versioning
  • Event replay engine
  • Projection rebuilding
  • Stream processing pipelines
  • Stateful aggregations
  • Window operations

โœ… v0.6 - Clean Architecture Refactoring (PHASE 2 COMPLETED)

  • Phase 1: Domain Layer (162 tests)
    • Value Objects (TenantId, EventType, EntityId)
    • Domain Entities (Event, Tenant, Schema, Projection)
    • Business rule enforcement
    • Self-validating types
  • Phase 2: Application Layer (20 tests)
    • DTOs for all operations
    • Tenant management use cases
    • Schema management use cases
    • Projection management use cases
    • Clean separation from domain
  • Phase 3: Infrastructure Layer (IN PROGRESS)
    • Repository pattern implementation
    • API layer refactoring
    • Service layer extraction
    • Dependency injection

โœ… v0.7 - Security & Cloud-Native (COMPLETED)

  • Security Infrastructure
    • JWT authentication with role-based access control (RBAC)
    • API key authentication for service-to-service
    • Rate limiting per tenant
    • IP filtering (global allowlist/blocklist)
    • Request ID tracking for audit trail
  • Multi-tenancy with Quotas
    • Tenant isolation at repository level
    • Configurable quotas (events/day, storage, rate limits)
    • Tenant activation/deactivation
  • Audit Logging
    • Comprehensive audit events for all operations
    • PostgreSQL audit repository
    • In-memory audit repository for testing
  • Serverless & Cloud Support
    • Graceful shutdown (SIGTERM handling)
    • PORT environment variable support
    • Fly.io deployment configurations
    • Google Cloud Run configurations
    • Helm charts for Kubernetes
  • Performance Optimizations
    • Arena-based memory pooling (2-5ns allocations)
    • SIMD JSON parsing
    • Lock-free batch processing

๐Ÿ“‹ v0.8 - Advanced Features (PLANNED)

  • Multi-tenancy support (Complete)
  • Audit logging (Complete)
  • Event encryption at rest
  • Retention policies
  • Data archival
  • Backup/restore enhancements

๐ŸŒ v1.0 - Distributed & Cloud-Native (PLANNED)

  • Distributed replication
  • Multi-region support
  • Consensus protocol (Raft)
  • Arrow Flight RPC
  • Kubernetes operators
  • Cloud-native deployment
  • Horizontal scaling
  • Load balancing

๐Ÿ”ฎ Future Enhancements (BACKLOG)

  • GraphQL API
  • WASM plugin system
  • Change Data Capture (CDC)
  • Time-series optimization
  • Machine learning integrations
  • Real-time anomaly detection
  • Event sourcing templates
  • Visual query builder

๐Ÿ”ฌ Technical Decisions

Why Rust?

  • Performance: Zero-cost abstractions, no GC pauses
  • Safety: Ownership model prevents data races
  • Concurrency: Fearless concurrency with Send/Sync
  • Ecosystem: Excellent libraries (Tokio, Axum, Arrow)

Why DashMap?

  • Lock-free concurrent HashMap
  • Better than RwLock<HashMap> for reads
  • Sharded internally for minimal contention
  • O(1) lookups with concurrent access

Why Apache Arrow/Parquet?

  • Industry-standard columnar format
  • Zero-copy data access
  • SIMD-accelerated operations
  • Excellent compression ratios
  • Interoperable with DataFusion, Polars, DuckDB

Why Tokio + Axum?

  • High-performance async runtime
  • Type-safe request handlers
  • Excellent ecosystem integration
  • Low overhead, fast routing

Why parking_lot?

  • Smaller and faster than std::sync::RwLock
  • No poisoning - simpler error handling
  • Better performance under contention
  • Widely used in production Rust

๐Ÿ“š Usage Examples

Programmatic API

use allsource_core::{EventStore, Event, QueryEventsRequest};
use serde_json::json;

// Create store
let store = EventStore::new();

// Ingest events
let event = Event::new(
    "user.created".to_string(),
    "user-123".to_string(),
    json!({
        "name": "Alice",
        "email": "alice@example.com"
    })
);
store.ingest(event)?;

// Query events
let request = QueryEventsRequest {
    entity_id: Some("user-123".to_string()),
    ..Default::default()
};
let events = store.query(request)?;

// Reconstruct state
let state = store.reconstruct_state("user-123", None)?;
println!("Current state: {}", state);

// Time-travel query
let timestamp = chrono::Utc::now() - chrono::Duration::hours(1);
let past_state = store.reconstruct_state("user-123", Some(timestamp))?;
println!("State 1 hour ago: {}", past_state);

Custom Projection

use allsource_core::projection::Projection;
use allsource_core::event::Event;
use serde_json::Value;
use parking_lot::RwLock;
use std::collections::HashMap;

struct RevenueProjection {
    totals: RwLock<HashMap<String, f64>>,
}

impl Projection for RevenueProjection {
    fn name(&self) -> &str {
        "revenue_by_customer"
    }

    fn process(&self, event: &Event) -> allsource_core::Result<()> {
        if event.event_type == "order.completed" {
            if let Some(amount) = event.payload["amount"].as_f64() {
                let mut totals = self.totals.write();
                *totals.entry(event.entity_id.clone()).or_insert(0.0) += amount;
            }
        }
        Ok(())
    }

    fn get_state(&self, entity_id: &str) -> Option<Value> {
        self.totals.read()
            .get(entity_id)
            .map(|total| json!({ "total_revenue": total }))
    }

    fn clear(&self) {
        self.totals.write().clear();
    }
}

๐Ÿ› Troubleshooting

Port Already in Use

# Find process using port 8080
lsof -i :8080

# Kill process
kill -9 <PID>

Slow Performance

# Always use release mode for benchmarks
cargo run --release
cargo bench

# Check system resources
top -o cpu

Memory Issues

# Monitor memory usage
RUST_LOG=allsource_core=debug cargo run --release

# Reduce batch sizes in config
# Adjust snapshot_config.max_events_before_snapshot

Test Failures

# Clean build
cargo clean
cargo test

# Check for stale processes
killall allsource-core

# Verbose test output
cargo test -- --nocapture --test-threads=1

๐Ÿ“– Resources

๐Ÿค Contributing

Contributions are welcome! Areas of interest:

  • Performance optimizations
  • Additional projection types
  • Query optimization
  • Documentation improvements
  • Bug fixes and tests

๐Ÿ“„ License

MIT License - see LICENSE file for details


AllSource Core - Event sourcing, done right

Built with ๐Ÿฆ€ Rust | Clean Architecture | Made for Production

Version 0.7.1 | 469K events/sec | 470+ tests passing | Security & Cloud-Native