mosaik 0.2.3

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
<p align="center">
  <h1 align="center">mosaik</h1>
  <p align="center">
    A Rust runtime for building self-organizing, leaderless distributed systems.
  </p>
  <p align="center">
    <a href="https://github.com/flashbots/mosaik/blob/main/LICENSE"><img alt="License: MIT" src="https://img.shields.io/badge/license-MIT-blue.svg"></a>
    <a href="https://crates.io/crates/mosaik"><img alt="Status" src="https://img.shields.io/crates/v/mosaik.svg?color=blue"></a>
    <a href="https://github.com/flashbots/mosaik"><img alt="Rust" src="https://img.shields.io/badge/rust-1.89%2B-blue.svg"></a>
    <a href="https://github.com/flashbots/mosaik"><img alt="Status" src="https://img.shields.io/badge/status-experimental-orange.svg"></a>
  </p>
</p>

---

> [!WARNING]
> **Experimental research software.** Mosaik is under active development. APIs and wire protocols may change without notice. Production quality is targeted for `v1.0`.

# Overview

Mosaik provides primitives for automatic peer discovery, typed pub/sub data streams, availability groups with Raft consensus, and synchronized data stores. Nodes deployed on plain VMs self-organize into a functioning topology using only a secret key, a gossip seed, and role tags — **no orchestration, configuration templates, or DevOps glue required.**

The core claim: when binaries are deployed on arbitrary machines, the network should self-organize, infer its own data-flow graph, and converge to a stable operational topology. This property is foundational for scaling the system, adding new capabilities, and reducing operational complexity.

Mosaik initially targets trusted, permissioned networks such as L2 chains controlled by a single organization. All members are assumed honest; the system is not yet Byzantine fault tolerant.

> [!TIP]
> To see mosaik in action, browse the integration tests in the [`tests`]tests/ directory.

# Core Primitives

## Discovery

Gossip-based peer discovery and catalog synchronization. Nodes announce their presence, capabilities (tags), and available streams/groups/stores. The catalog converges across the network through two complementary protocols:

- **Announcements** — real-time broadcast of peer presence and metadata changes via `iroh-gossip`, with signed entries and periodic re-announcements
- **Catalog Sync** — full bidirectional catalog exchange for initial catch-up and on-demand synchronization

Discovery is largely transparent and ships with sensible defaults. To spin up a node on a given network, just provide a `NetworkId`:

```rust
use mosaik::*;

let network_id = NetworkId::random()
let node = Network::new(network_id).await?;
```

For finer control, use `NetworkBuilder` to customize discovery settings such as tags or bootstrap peers:

```rust
let n0 = Network::builder(network_id)
  .with_discovery(
    discovery::Config::builder()
      .with_tags("tag1")
      .with_tags(["tag2", "tag3"])
      .with_bootstrap([peer_id1, peer_id2])
  ).build().await?;
```

## Streams

Typed async pub/sub data channels connecting producers and consumers across the network. Any serializable type automatically implements `Datum` and can be streamed.

```rust
let network_id = NetworkId::random();

let n0 = Network::new(network_id).await?;
let n1 = Network::new(network_id).await?;
let n2 = Network::new(network_id).await?;

let mut p0 = n0.streams().produce::<Data1>();
let mut c1 = n1.streams().consume::<Data1>();
let mut c2 = n2.streams().consume::<Data1>();

// await topology formation
p0.when().subscribed().minimum_of(2).await;

// produce item (implements futures::Sink)
p0.send(Data1(42)).await?;

// consume item (implements futures::Stream)
assert_eq!(c1.next().await, Some(Data1(42)));
assert_eq!(c2.next().await, Some(Data1(42)));
```

Producers and consumers can be further configured:

```rust
let producer = network.streams()
  .producer::<Data1>()
  .accept_if(|peer| peer.tags().contains("tag1"))
  .online_when(|c| c.minimum_of(2))
  .with_max_consumers(4)
  .build()

let consumer = network.streams()
  .consumer::<Data1>()
  .subscribe_if(|peer| peer.tags().contains("tag2"))
  .build();
```

Key features:

- **Consumer predicates** — conditions for accepting subscribers (auth, attestation, tags)
- **Producer limits** — cap subscriber count or egress bandwidth
- **Online conditions** — define when a producer/consumer is ready (e.g., "online when ≥2 subscribers with tag X are connected")
- **Per-subscription stats** — datums count, bytes count, uptime tracking
- **Backpressure** — slow consumers are disconnected to prevent head-of-line blocking


## Collections

Replicated, eventually consistent data structures built on top of Groups. Each collection is backed by a Raft-replicated state machine. Every collection has a **writer** (can mutate) and a **reader** (read-only replica that tracks the writer's state).

All mutations return a `Version` that can be awaited on readers via `when().reaches(ver)` to confirm convergence.

### `Map<K, V>`

Replicated unordered key-value store.

```rust
let store_id = StoreId::random();

// On the writer node
let map = mosaik::collections::Map::<String, u64>::new(&network, store_id);
map.when().online().await;

map.insert("alice".into(), 100).await?;
map.extend(vec![("bob".into(), 200), ("carol".into(), 300)]).await?;

assert_eq!(map.get(&"alice".into()), Some(100));
assert!(map.contains_key(&"bob".into()));
assert_eq!(map.len(), 3);

map.remove("carol".into()).await?;

// On a reader node
let reader = mosaik::collections::Map::<String, u64>::reader(&network, store_id);
reader.when().online.await;
assert_eq!(reader.get(&"bob".into()), Some(200));

let ver = map.clear().await?;
reader.when().reaches(ver).await;
assert!(reader.is_empty());
```

### `Vec<T>`

Replicated ordered, index-addressable sequence.

```rust
let store_id = StoreId::random();

let vec_writer = mosaik::collections::Vec::<u64>::writer(&network, store_id);
vec_writer.when().online().await;

// Push to front and back
vec.push_back(42).await?;
vec.push_front(10).await?;
vec.extend([7, 13, 21]).await?;

// On a reader node
let vec_reader = mosaik::collections::Vec::<u64>::reader(&network, store_id);
vec_reader.when().online().await;

assert_eq!(vec_reader.get(0), Some(10));
assert_eq!(vec_reader.get(1), Some(42));
assert_eq!(vec_reader.get(2), Some(7));
assert_eq!(vec_reader.get(3), Some(13));

let ver = vec_writer.clear().await?;
vec_reader.when().reaches(ver).await;
assert!(vec_reader.is_empty());

```

### `Set<T>`

Replicated unordered collection of unique values.

```rust
let store_id = StoreId::random();

let set = mosaik::collections::Set::<u64>::new(&network, store_id);
set.when().online().await;

set.insert(42).await?;
set.extend([7, 13, 21]).await?;
set.insert(42).await?;       // duplicate — len stays 4
set.remove(7).await?;

// On a reader node
let reader = mosaik::collections::Set::<u64>::reader(&network, store_id);
reader.when().online().await;
assert!(reader.contains(&13));
assert!(!reader.contains(&7));
assert_eq!(reader.len(), 3);
```

## Groups

Availability groups — clusters of trusted nodes that coordinate for failover and shared state. Built on a **modified Raft consensus** optimized for trusted environments:

```rust
let group = network.groups()
    .with_key(group_key)
    .with_state_machine(counter)
    .with_storage(InMemory::default())
    .join()
    .await?;

// Wait for leader election
group.when().leader_elected().await;

// Replicate a command
group.execute(Increment(5), Consistency::Strong).await?;
```

Key features:

- **Bonded mesh** — every pair of members maintains a persistent bidirectional connection, authenticated via HMAC-derived proofs of a shared group secret
- **Non-voting followers** — nodes behind the leader's log abstain from votes, preventing stale nodes from disrupting elections
- **Dynamic quorum** — abstaining nodes excluded from the quorum denominator
- **Distributed log catch-up** — lagging followers partition the log range across responders and pull in parallel
- **Replicated state machines** — implement the `StateMachine` trait with `apply(command)` for mutations and `query(query)` for reads
- **Consistency levels**`Weak` (local, possibly stale) vs `Strong` (forwarded to leader)
- **Reactive conditions**`when().is_leader()`, `when().is_follower()`, `when().leader_changed()`, `when().is_online()`

# Architecture

Mosaik is built on [iroh](https://github.com/n0-computer/iroh) for QUIC-based peer-to-peer networking with relay support.

```text
┌─────────────────────────────────────────────┐
│                  Network                    │
│                                             │
│  ┌───────────┐  ┌─────────┐  ┌───────────┐  │
│  │ Discovery │  │ Streams │  │  Groups   │  │
│  │           │  │         │  │           │  │
│  │ Announce  │  │Producer │  │  Bonds    │  │
│  │ Catalog   │  │Consumer │  │  Raft     │  │
│  │ Sync      │  │Status   │  │  RSM      │  │
│  └───────────┘  └─────────┘  └───────────┘  │
│                                             │
│  ┌─────────────┐  ┌──────────────────────┐  │
│  │ Collections │  │    Transport (iroh)  │  │
│  │ Map Vec Set │  │  QUIC · Relay · mDNS │  │
│  └─────────────┘  └──────────────────────┘  │
└─────────────────────────────────────────────┘
```

# Repository Layout

| Path               | Description                                                           |
| ------------------ | --------------------------------------------------------------------- |
| `src/`             | Core library — all shared primitives, protocols, and APIs             |
| `src/discovery/`   | Peer discovery, announcement, and catalog synchronization             |
| `src/streams/`     | Typed pub/sub: producers, consumers, status conditions, criteria      |
| `src/groups/`      | Availability groups: bonds, Raft consensus, replicated state machines |
| `src/collections/` | Replicated data structures: `Map`, `Vec`, `Set`                       |
| `src/network/`     | Transport layer, connection management, typed links                   |
| `src/primitives/`  | Identifiers (`Digest`), formatting helpers, async work queues         |
| `src/builtin/`     | Built-in implementations (`NoOp` state machine, `InMemory` storage)   |
| `tests/`           | Integration tests organized by subsystem                              |

# Getting Started

## Prerequisites

- Rust toolchain **≥ 1.87** — install with `rustup toolchain install 1.87`

## Usage

```bash
cargo add mosaik
```

or in `Cargo.toml`

```toml
[dependencies]
mosaik = "0.2.1"
```

## Scenario Tests

Read through the scenario tests in the [`tests/`](tests/) directory for practical examples of mosaik capabilities.

```bash
# Run all integration tests
TEST_TRACE=on cargo test --test basic -- --test-threads=1

# Run some integration tests
TEST_TRACE=on cargo test --release --test basic collections::map -- --test-threads=1

# Verbose test output with tracing
TEST_TRACE=on cargo test --test basic groups::leader::is_elected
TEST_TRACE=trace cargo test --test basic groups::leader::is_elected
```

If tests are running on a slow network, the timeouts can be extended by setting the `TIME_FACTOR` env variable that will multiply all timeout durations by the given value, e.g:

```bash
TIME_FACTOR=3 TEST_TRACE=on cargo test --test basic groups::leader::is_elected
```

# Roadmap

## Stage 1: Primitives *(current)*

Core primitives for building self-organized distributed systems in trusted, permissioned networks.

- [x] **Discovery** — gossip announcements, catalog sync, tags
- [x] **Streams** — producers, consumers, predicates, limits, online conditions, stats
- [x] **Groups** — membership, shared state, failover, load balancing
- [x] **Collections** - Replicated, eventually consistent data structures
- [ ] **Preferences** — ranking producers by latency, geo-proximity
- [ ] **Diagnostics** — network inspection, automatic metrics, developer debug tools

### Stage 2: Trust & Privacy

Advanced stream subscription authorization, attested sandbox runtimes, trust corridors, etc.

### Stage 3: Decentralization & Permissionlessness

Extending the system beyond trusted, single-operator environments.

## Contributing

- **Commits:** concise, imperative subjects referencing the component (e.g., *"Progress on pubsub semantics"*)
- **PRs:** include a summary, linked issues/RFCs, and a checklist confirming `cargo build`, `cargo test`, `cargo clippy`, and `cargo fmt` pass. Attach logs or screenshots for user-visible changes.
- **Tests:** add or extend integration coverage for behavioral changes. Note remaining gaps or follow-up work in the PR body.

## License

MIT — see [LICENSE](LICENSE) for details.