Majra
مجرا (Arabic/Persian: conduit, channel) — Distributed queue & multiplex engine for Rust
Majra provides shared messaging primitives for the AGNOS ecosystem, eliminating duplicate pub/sub, queue, relay, and heartbeat implementations across AgnosAI, Ifran, SecureYeoman, and daimon.
Pure Rust, async-native — built on tokio, zero-copy where possible.
Features
| Module | Feature | Description |
|---|---|---|
| pubsub | pubsub |
Topic-based pub/sub with MQTT-style */# wildcard matching |
| pubsub | pubsub |
TypedPubSub<T> — generic, type-safe pub/sub with backpressure, replay, and filters |
| queue | queue |
Multi-tier priority queue (5 levels) with DAG dependency scheduling |
| queue | queue |
ManagedQueue<T> — resource-aware, lifecycle-tracked, concurrent job queue |
| relay | relay |
Sequenced, deduplicated inter-node message relay |
| transport | relay |
Pluggable transport trait + multiplexed connection pool |
| ipc | ipc |
Length-prefixed framing over Unix domain sockets |
| heartbeat | heartbeat |
TTL-based node health: Online → Suspect → Offline with GPU telemetry and fleet stats |
| ratelimit | ratelimit |
Per-key token bucket rate limiter with stale-key eviction and stats |
| barrier | barrier |
N-way barrier sync with deadlock recovery + async arrive_and_wait() |
| metrics | always | MajraMetrics trait — wire to Prometheus/OpenTelemetry |
Default features: pubsub, queue, relay, heartbeat.
Optional: ipc, ratelimit, barrier, sqlite (persistent queue backing).
Quick Start
[]
= "0.21"
Typed Pub/Sub
use ;
let hub = new;
// Subscribe with a filter — only high-priority events.
let mut rx = hub.subscribe_filtered;
hub.publish;
let msg = rx.recv.await.unwrap;
Managed Queue (GPU-aware scheduling)
use ;
use Duration;
let queue = new;
// Enqueue a GPU training job.
let id = queue.enqueue.await;
// Dequeue only what fits available resources.
let pool = ResourcePool ;
if let Some = queue.dequeue.await
Fleet Heartbeat with GPU Telemetry
use ;
let tracker = default;
tracker.register_with_telemetry;
let stats = tracker.fleet_stats;
println!;
Async Barrier
use AsyncBarrierSet;
let barriers = new;
barriers.create;
// Each worker awaits the barrier — released when all arrive.
barriers.arrive_and_wait.await?;
Priority Queue
use ;
let mut q = new;
q.enqueue;
q.enqueue;
assert_eq!;
Message Relay
use Relay;
let relay = new;
let mut rx = relay.subscribe;
relay.broadcast;
Rate Limiter with Stats
use RateLimiter;
let limiter = new; // 100 req/s, burst 50
limiter.check;
let stats = limiter.stats;
// Evict idle keys to prevent unbounded memory growth.
limiter.evict_stale;
Ecosystem
Majra unifies patterns from battle-tested implementations:
| Consumer | What majra replaces |
|---|---|
| Ifran | Job scheduler (PriorityQueue), fleet heartbeat, rate limiting |
| SecureYeoman | EventDispatcher, A2A heartbeat, sliding-window rate limiter, DAG workflow, swarm barriers (~3,200 lines) |
| AgnosAI | Priority DAG scheduling, pub/sub wildcards, relay dedup, barrier sync |
| daimon | Topic routing, fleet relay, IPC framing |
License
AGPL-3.0-only