cal-redis 0.1.73

Callable Redis Implementation
Documentation

Cal-Redis: Multi-tenant Caching Layer for Callable Platform

Overview

Cal-Redis is a dual-layer caching system that combines local in-memory caching (using Moka) with Redis for distributed caching. It provides a unified, type-safe API for accessing Callable platform data with automatic fallback mechanisms and real-time cache synchronization via Redis pub/sub.

Key Features

  • Dual-layer caching: Local (Moka) + Remote (Redis) for optimal performance
  • Multi-tenant support: Complete data isolation with account-scoped keys
  • Real-time synchronization: Redis pub/sub keeps caches synchronized across instances
  • Type-safe API: Strongly typed interfaces for all domain objects
  • Automatic cache invalidation: Event-driven updates via Redis events
  • Session management: Built-in session handling with TTL support
  • WebSocket routing: Support for account-isolated WebSocket connections

Architecture

┌─────────────────┐         ┌─────────────────┐
│   Application   │         │   Application   │
│  (rust-softphone)│         │  (rust-softphone)│
└────────┬────────┘         └────────┬────────┘
         │                           │
         ▼                           ▼
┌─────────────────┐         ┌─────────────────┐
│   CallableCache │         │   CallableCache │
├─────────────────┤         ├─────────────────┤
│  Local Cache    │         │  Local Cache    │
│    (Moka)       │         │    (Moka)       │
├─────────────────┤         ├─────────────────┤
│  Remote Cache   │◄────────┤  Remote Cache   │
│    (Redis)      │  Pub/Sub│    (Redis)      │
└─────────────────┘         └─────────────────┘
         │                           │
         └──────────┬────────────────┘
                    ▼
            ┌─────────────┐
            │    Redis    │
            │   (Valkey)  │
            └─────────────┘

Project Structure

cal-redis/
├── src/
│   ├── lib.rs              # Public API exports
│   ├── cache.rs            # Main CallableCache implementation (~900 lines)
│   ├── constants.rs        # All Redis key patterns and builders
│   ├── common.rs           # Shared utilities for serialization
│   ├── local_cache.rs      # Moka-based local cache implementation
│   ├── redis_cache.rs      # Redis connection wrapper
│   ├── helpers.rs          # RedisHelpers trait for common operations
│   ├── account.rs          # Account-related Redis operations
│   ├── region.rs           # Region-related Redis operations
│   ├── proxy.rs            # Proxy-related Redis operations
│   └── user.rs             # User-related Redis operations
├── Cargo.toml
└── README.md

Redis Key Patterns

All keys follow a consistent pattern with the cal: prefix and account isolation where applicable:

Global Keys (not account-scoped)

cal:regions                           # All regions
cal:region:idents                     # Region identifier mappings
cal:proxies                          # All proxies
cal:accounts                         # All account summaries
cal:account:idents                   # Account identifier mappings
cal:users                            # All users
cal:user:idents                      # User identifier mappings
cal:trunk:{trunk_ip}                 # Trunk to account mappings
cal:ws:connections                   # WebSocket connection mappings
cal:events                           # Global event channel

Account-Scoped Keys

cal:account:{account_id}:devices                        # Account devices
cal:account:{account_id}:device:idents                  # Device identifier mappings
cal:account:{account_id}:ddis                          # Account DDIs
cal:account:{account_id}:trunks                        # Account trunks
cal:account:{account_id}:hooks                         # Account hooks
cal:account:{account_id}:assets                        # Account assets
cal:account:{account_id}:addresses                     # Account addresses
cal:account:{account_id}:agent:status:{user_id}        # Agent status
cal:account:{account_id}:agents:by_user                # Agent name mappings
cal:account:{account_id}:agents:registered             # Registered agents set
cal:account:{account_id}:agents:available              # Available agents set
cal:account:{account_id}:agents:connected              # Connected agents set
cal:account:{account_id}:session:{session_id}          # Session data
cal:account:{account_id}:sessions:active               # Active sessions set
cal:account:{account_id}:queue:{queue_name}            # Queue (sorted set)
cal:account:{account_id}:queues:meta                   # Queue metadata
cal:account:{account_id}:conversation:{conversation_id} # Conversation data
cal:account:{account_id}:events                        # Account event channel

Core Components

CallableCache

The main entry point providing a unified caching interface:

pub struct CallableCache {
    pub local_cache: LocalCache,    // Moka-based in-memory cache
    pub remote_cache: RedisCache,   // Redis connection wrapper
}

Access Patterns

  1. Cache-first (default): Checks local cache, then Redis

    let user = cache.get_user_by_id("user123").await?;
    
  2. Direct Redis: Bypasses local cache for transient data

    let session = cache.session_get(&account_id, &session_id).await?;
    
  3. Raw Redis connection: For custom operations

    let mut con = cache.redis_connection();
    con.xadd("stream:events", "*", &[("event", "login")]).await?;
    

LocalCache

Uses Moka for efficient in-memory caching with TTL:

  • Regions: 5MB, 60s TTL
  • Accounts: 50MB, 60s TTL
  • Users: 20MB, 60s TTL
  • Devices: 20MB, 60s TTL
  • Flow states: 50MB, 4 hours TTL

Event System

Redis pub/sub events automatically update local caches:

  • AccountCreate/Update/Delete/Sync
  • RegionCreate/Update/Delete/Sync
  • UserCreate/Update/Delete/Sync

Key Features Implementation

Multi-tenancy

All operations are account-isolated:

// Sessions include account_id in the key
cache.session_set(&account_id, &session_data).await?;

// Agent status is account-scoped
cache.agent_status_set(&account_id, &agent_status).await?;

// Queries are account-filtered
let agents = cache.agents_get_available(&account_id).await?;

Session Management

Uses SessionData struct with automatic expiry:

pub struct SessionData {
    pub session_id: String,
    pub user_id: String,
    pub agent_id: String,
    pub account_id: String,
    pub created_at: DateTime<Utc>,
    pub expires_at: DateTime<Utc>,
}

Agent Status Tracking

Complete agent state management:

pub struct AgentStatus {
    pub user_id: String,
    pub account_id: String,
    pub agent_name: String,
    pub agent_status: String,
    pub registration: Option<UserRegistration>,
    pub user_call_status: Option<UserCallStatus>,
}

WebSocket Routing

Maps WebSocket connections to accounts for targeted messaging:

// Track connection
cache.add_connected_agent(&account_id, &user_id, &agent_id, &ws_conn_id).await?;

// Get all connections for an account
let connections = cache.get_account_connections(&account_id).await?;

Usage Examples

Basic Operations

use cal_redis::{CallableCache, RedisHelpers};

// Initialize cache
let cache = CallableCache::new().await;

// Get user (cache-first)
let user = cache.get_user_by_id("user123").await?;

// Create session
let session = cache.create_session(&account_id, &user_id, &agent_id, 3600).await?;

// Update agent status
let status = AgentStatus { /* ... */ };
cache.set_agent_status(&account_id, &status).await?;

// Queue operations
cache.queue_add(&account_id, "support", "ticket123", 1.0).await?;
let next = cache.queue_pop(&account_id, "support").await?;

Advanced Patterns

// Batch operations
let users = cache.get_users_batch(&["id1", "id2", "id3"]).await?;

// Direct Redis for custom operations
let mut con = cache.redis_connection();
con.xadd("custom:stream", "*", &[("data", "value")]).await?;

// Account-specific pub/sub
cache.publish_account_event(&account_id, &event).await?;

Environment Variables

# Required
CAL_VALKEY_HOST=redis://localhost:6379

# Optional
RUST_LOG=cal_redis=debug

Dependencies

  • redis: Redis client with async support
  • moka: High-performance in-memory cache
  • serde: Serialization/deserialization
  • chrono: Date/time handling
  • tokio: Async runtime
  • cal-core: Domain models and types

Error Handling

All operations return Result<T, RedisError> with proper error conversion:

  • Serialization errors → RedisError
  • Network errors → RedisError
  • Validation errors → RedisError

Performance Characteristics

  • Local cache hit: ~0.001ms
  • Redis cache hit: ~1-5ms
  • Cache miss + Redis fetch: ~2-10ms
  • Event propagation: ~1-2ms

Migration Notes

When migrating from direct Redis usage:

  1. Replace direct Redis connections with CallableCache
  2. Update key patterns to use constants
  3. Add account_id to all tenant-specific operations
  4. Use SessionData struct instead of raw session storage
  5. Implement RedisHelpers trait for convenience methods

Future Enhancements

  • Redis Cluster support
  • Metrics and monitoring
  • Cache warming strategies
  • Selective cache invalidation
  • Redis Streams for event sourcing
  • Backup and restore utilities

Related Projects

  • cal-core: Domain models and types
  • rust-softphone-server: WebSocket server using cal-redis

Contributing

When adding new features:

  1. Follow the existing key pattern conventions
  2. Ensure multi-tenant isolation
  3. Add both cache-first and direct access methods
  4. Update constants.rs with new key patterns
  5. Implement proper error conversion
  6. Add event handlers if cache invalidation is needed