Skip to main content

Crate hyli_bus

Crate hyli_bus 

Source
Expand description

§hyli-bus — Modular Monolith with Microservice Advantages

hyli-bus is an async, in-process message bus that lets you structure your application as a set of loosely-coupled modules (think micro-services) while keeping them all in a single binary (think monolith).

§Design goals

GoalApproach
Clear module boundariesEach module declares its message contract with module_bus_client!
Simple operationsEverything runs in one binary — no network, no service discovery
Compile-time safetyRust’s type system enforces that senders and receivers agree on types
No serialisation overheadMessages are cloned in-process, not marshalled to bytes
Easy testingSpin up only the module(s) under test and inject typed events directly

§Quick start

§1. Define your message types

Any type that implements BusMessage can travel on the bus.

#[derive(Clone)]
struct OrderPlaced { order_id: u64 }

#[derive(Clone)]
struct QueryNextBatch;

#[derive(Clone)]
struct Batch(Vec<u64>);

impl hyli_bus::BusMessage for OrderPlaced {}
impl hyli_bus::BusMessage for QueryNextBatch {}
impl hyli_bus::BusMessage for Batch {}

§2. Declare a module’s contract

module_bus_client! generates a strongly-typed struct that owns exactly the senders and receivers declared — nothing more, nothing less.

use hyli_bus::module_bus_client;

module_bus_client! {
    struct ProcessorBusClient {
        sender(OrderPlaced),          // events this module emits
        receiver(OrderPlaced),        // events this module consumes
        query(QueryNextBatch, Batch), // synchronous request/response
    }
}

ShutdownModule and PersistModule receivers are added automatically by the macro.

§3. Implement the module

use hyli_bus::{Module, SharedMessageBus, module_handle_messages};

struct Processor {
    bus: ProcessorBusClient,
    // ... your state
}

impl Module for Processor {
    type Context = (); // build-time configuration

    async fn build(bus: SharedMessageBus, _ctx: ()) -> anyhow::Result<Self> {
        Ok(Self { bus: ProcessorBusClient::new_from_bus(bus).await })
    }

    async fn run(&mut self) -> anyhow::Result<()> {
        module_handle_messages! {
            on_self self,

            listen<OrderPlaced> ev => { /* handle event */ }

            command_response<QueryNextBatch, Batch> q => {
                Ok(Batch(vec![]))
            }
        }
        Ok(())
    }
}

§4. Wire it all together

use hyli_bus::{SharedMessageBus, ModulesHandler, ModulesHandlerOptions};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let bus = SharedMessageBus::new();
    let mut handler = ModulesHandler::new(&bus, "data".into(), ModulesHandlerOptions::default())?;

    // Build all modules before starting — guarantees no message is lost.
    handler.build_module::<Processor>(()).await?;

    handler.start_modules().await?;
    // Run until SIGINT / SIGTERM.
    handler.exit_process().await
}

§Architecture overview

 ┌────────────────────────────────────────────────┐
 │               SharedMessageBus                 │
 │   Arc<Mutex<AnyMap<broadcast::Sender<M>>>>     │
 └─────────┬──────────────────────────┬───────────┘
           │ subscribe                │ subscribe
 ┌─────────▼──────────┐   ┌──────────▼──────────┐
 │   Module A         │   │   Module B           │
 │  (MempoolBusClient)│   │  (ConsensusBusClient)│
 │   sender(EventA)   │──▶│   receiver(EventA)   │
 └────────────────────┘   └─────────────────────-┘
  • Each message type gets one tokio::sync::broadcast channel, created on first use.
  • Cloning a bus handle gives access to the same underlying channels.
  • All communication is in-process: zero network hops, zero serialisation.

§Modules

Re-exports§

pub use bus::BusClientReceiver;
pub use bus::BusClientSender;
pub use bus::BusEnvelope;
pub use bus::BusMessage;
pub use bus::BusReceiver;
pub use bus::BusSender;
pub use bus::SharedMessageBus;
pub use bus::DEFAULT_CAPACITY;
pub use bus::LOW_CAPACITY;
pub use modules::Module;
pub use modules::ModulesHandler;
pub use modules::ShutdownClient;

Modules§

bus
Core message bus primitives.
modules
utils

Macros§

bus_client
Declare a typed bus client struct.
handle_messages
Build a tokio::select!-based event loop for a bus client.
info_span_ctx
Create a tracing::info_span and, when the instrumentation feature is enabled, set its OpenTelemetry parent to $ctx.
log_debug
Macro designed to log warnings
log_error
Macro designed to log errors
log_warn
Macro designed to log warnings
module_bus_client
Declare a typed bus client struct for a Module.
module_handle_messages
Event loop macro for modules.
static_type_map
This creates a struct that provides Pick for each type in the initial list, allowing generic code to access the inner values. Only one value of each type can be stored in the struct. (Somewhat similar to frunk HList or a fancier named tuple).

Structs§

KeyValue
A key-value pair describing an attribute.