liminal-server 0.2.1

Standalone server for the liminal messaging bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};

use haematite::{Database, DatabaseConfig, EventStore};
use liminal::durability::bridge::block_on;
use liminal::durability::{
    DurabilityError, DurableStore, HaematiteStore, MessageEnvelope as DurableEnvelope, StoredEntry,
};
use liminal::protocol::{CausalContext, MessageEnvelope, SchemaId};
use tempfile::TempDir;

use super::services::{ConnectionServices, LiminalConnectionServices};
use crate::config::types::{ChannelDef, ServerConfig};

/// Builds an on-disk haematite store in a fresh tempdir, returning both the
/// store and the `TempDir` guard (which must outlive the store).
fn disk_store() -> Result<(Arc<dyn DurableStore>, TempDir), Box<dyn std::error::Error>> {
    let dir = tempfile::tempdir()?;
    let database = Database::create(DatabaseConfig {
        data_dir: dir.path().join("db"),
        shard_count: 4,
        sweep_interval: None,
        distributed: None,
    })?;
    let store: Arc<dyn DurableStore> =
        Arc::new(HaematiteStore::new(Arc::new(EventStore::new(database))));
    Ok((store, dir))
}

// The durable runtime channel maps onto a single partition, so its store stream
// key is "<channel>:0" (see `DurableChannel::stream_key_for`).
const ORDERS_STREAM_KEY: &str = "orders:0";

#[test]
fn shutdown_flush_persists_durable_channel_state_to_store() -> Result<(), Box<dyn std::error::Error>>
{
    let (store, _dir) = disk_store()?;
    let services =
        LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;

    // Publish through the same path a connection process uses. Payloads are JSON
    // (the channel schema validates them as JSON).
    let first = br#"{"order":1}"#.to_vec();
    let second = br#"{"order":2}"#.to_vec();
    services.publish("orders", &order_envelope(first.clone()), None)?;
    services.publish("orders", &order_envelope(second.clone()), None)?;

    // Run the graceful-shutdown durable flush.
    services.flush_durable_state()?;

    // Read the state back out of the store the durable channel wrote to. If
    // `Channel::flush`/`publish` were the old `drop(lock)` no-op, the stream
    // would be empty and these assertions would fail.
    let persisted = read_payloads(services.durable_store().as_ref(), ORDERS_STREAM_KEY)?;
    assert_eq!(persisted, vec![first, second]);

    Ok(())
}

#[test]
fn persisted_durable_state_survives_fresh_services_over_same_store()
-> Result<(), Box<dyn std::error::Error>> {
    let (store, _dir) = disk_store()?;

    // First "process lifetime": publish + shutdown flush.
    {
        let services = LiminalConnectionServices::from_config_with_store(
            &durable_orders_config()?,
            Arc::clone(&store),
        )?;
        services.publish("orders", &order_envelope(br#"{"order":7}"#.to_vec()), None)?;
        services.flush_durable_state()?;
    }

    // Restart-at-the-store-level: a fresh services built over the SAME store data
    // sees the persisted message.
    let restarted =
        LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;
    let persisted = read_payloads(restarted.durable_store().as_ref(), ORDERS_STREAM_KEY)?;
    assert_eq!(persisted, vec![br#"{"order":7}"#.to_vec()]);

    Ok(())
}

/// 13-L1 load-bearing: a duplicate publish carrying the SAME idempotency key is
/// delivered to a live subscriber EXACTLY ONCE (dedup-on-delivery), while the
/// genuine delivery ack reports `delivered` truthfully on each publish.
///
/// This exercises the real server publish path (the same `services.publish` a
/// connection process drives) with a real channel subscriber, then drains that
/// subscriber's inbox to prove the duplicate never reached it.
#[test]
fn duplicate_idempotency_key_delivers_to_subscriber_exactly_once()
-> Result<(), Box<dyn std::error::Error>> {
    let (store, _dir) = disk_store()?;
    let services =
        LiminalConnectionServices::from_config_with_store(&ephemeral_orders_config()?, store)?;

    // A live subscriber on the channel so a delivery is genuinely observable.
    let subscription = services.subscribe_handle_for_test("orders")?;

    let payload = br#"{"order":1}"#.to_vec();

    // First publish with key "k1": fresh dedup claim, one subscriber => the
    // genuine delivery ack is `true` and the message reaches the subscriber.
    let first = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
    assert!(
        first.delivered,
        "first publish of a fresh key with a subscriber must report a genuine delivery"
    );

    // Duplicate publish with the SAME key "k1": dedup suppresses fan-out, so the
    // ack reports NOT delivered and the subscriber must NOT receive a second copy.
    let duplicate = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
    assert!(
        !duplicate.delivered,
        "a duplicate idempotency key must be suppressed (no second delivery)"
    );

    // A different key "k2" is a distinct message: delivered again.
    let other_payload = br#"{"order":2}"#.to_vec();
    let other = services.publish("orders", &order_envelope(other_payload.clone()), Some("k2"))?;
    assert!(
        other.delivered,
        "a different idempotency key must be delivered"
    );

    // Drain the subscriber inbox: it must hold EXACTLY the two distinct messages
    // (k1 once, k2 once), never the suppressed duplicate.
    let mut received = Vec::new();
    while let Some(envelope) = subscription.try_next()? {
        received.push(envelope.payload);
    }
    assert_eq!(
        received,
        vec![payload, other_payload],
        "subscriber must receive each distinct key once and never the duplicate"
    );

    Ok(())
}

/// 13-L1: with NO live subscriber, a publish succeeds but the genuine delivery
/// ack reports `delivered = false` (accepted by the bus, received by no one).
#[test]
fn publish_without_subscriber_reports_not_delivered() -> Result<(), Box<dyn std::error::Error>> {
    let (store, _dir) = disk_store()?;
    let services =
        LiminalConnectionServices::from_config_with_store(&ephemeral_orders_config()?, store)?;

    let outcome = services.publish("orders", &order_envelope(br#"{"order":9}"#.to_vec()), None)?;
    assert!(
        !outcome.delivered,
        "a publish that reaches no subscriber must report a non-delivery ack"
    );

    Ok(())
}

/// Regression for the "dedup claim leaks `InFlight` forever on publish failure"
/// bug. A failed `publish_with_delivery` must release the dedup claim it took, so
/// a re-publish of the same key is DELIVERED, not permanently suppressed.
///
/// The failure is injected with a store double that rejects appends to the
/// durable channel stream (`orders:0`) while letting dedup-namespace appends
/// through, so the claim succeeds, the durable persist fails, and the release can
/// still write its tombstone. This test MUST fail without the release fix (the
/// re-publish would see `InFlight` and report `delivered = false`).
#[test]
fn publish_failure_releases_claim_so_reclaim_is_delivered() -> Result<(), Box<dyn std::error::Error>>
{
    let (inner, _dir) = disk_store()?;
    let failing = Arc::new(FailingAppendStore::new(inner, |stream_key| {
        // Fail the durable-channel append, but never the dedup-namespace append.
        stream_key == ORDERS_STREAM_KEY
    }));
    let store: Arc<dyn DurableStore> = Arc::clone(&failing) as Arc<dyn DurableStore>;
    let services =
        LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;

    // A live subscriber so a successful delivery is genuinely observable.
    let subscription = services.subscribe_handle_for_test("orders")?;
    let payload = br#"{"order":1}"#.to_vec();

    // First publish with key "k1": the claim is taken, then the durable persist
    // fails -> publish returns Err. Without the fix the claim is left InFlight.
    let first = services.publish("orders", &order_envelope(payload.clone()), Some("k1"));
    assert!(
        first.is_err(),
        "publish must surface the injected durable-append failure"
    );

    // Re-publish the SAME key "k1" after the failure is cleared. With the claim
    // released, this is a fresh claim and is genuinely delivered. Without the
    // release fix this would be suppressed (delivered = false).
    failing.clear_failure();
    let retry = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
    assert!(
        retry.delivered,
        "after a failed publish releases its claim, the re-publish must be delivered, not suppressed"
    );

    // The subscriber receives exactly one copy (the successful retry).
    let mut received = Vec::new();
    while let Some(envelope) = subscription.try_next()? {
        received.push(envelope.payload);
    }
    assert_eq!(
        received,
        vec![payload],
        "the retry delivers exactly once; the failed publish delivered nothing"
    );
    Ok(())
}

/// Best-effort release: when `release_claim` ITSELF errors, `publish` still
/// returns the ORIGINAL publish error (not the release error) and does not panic.
#[test]
fn publish_failure_with_failing_release_returns_original_error()
-> Result<(), Box<dyn std::error::Error>> {
    let (inner, _dir) = disk_store()?;
    // The dedup CLAIM (first dedup-stream append) must succeed so the publish path
    // reaches publish_with_delivery; the channel persist then fails (the ORIGINAL
    // error) and the release tombstone append (second dedup-stream append) also
    // fails (so release_claim ITSELF errors). The decorator fails every append
    // except the very first one per stream, which lets the claim through.
    let store = Arc::new(FailingAppendStore::fail_after_first_per_stream(inner));
    let store: Arc<dyn DurableStore> = store;
    let services =
        LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;

    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        services.publish(
            "orders",
            &order_envelope(br#"{"order":1}"#.to_vec()),
            Some("k1"),
        )
    }));
    let publish_result = result.map_err(|_| "publish panicked under failing release")?;
    let error = publish_result
        .err()
        .ok_or("publish must surface the original failure")?;
    let message = error.to_string();
    assert!(
        message.contains("liminal publish failed"),
        "must return the ORIGINAL publish error, got: {message}"
    );
    Ok(())
}

fn ephemeral_orders_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
    Ok(ServerConfig {
        listen_address: "127.0.0.1:0".parse()?,
        health_listen_address: "127.0.0.1:0".parse()?,
        drain_timeout_ms: 30_000,
        channels: vec![ChannelDef {
            name: "orders".to_owned(),
            schema_ref: "schemas/orders.json".to_owned(),
            durable: false,
        }],
        routing_rules: Vec::new(),
        persistence_path: None,
        cluster: None,
    })
}

fn durable_orders_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
    Ok(ServerConfig {
        listen_address: "127.0.0.1:0".parse()?,
        health_listen_address: "127.0.0.1:0".parse()?,
        drain_timeout_ms: 30_000,
        channels: vec![ChannelDef {
            name: "orders".to_owned(),
            schema_ref: "schemas/orders.json".to_owned(),
            durable: true,
        }],
        routing_rules: Vec::new(),
        persistence_path: None,
        cluster: None,
    })
}

fn order_envelope(payload: Vec<u8>) -> MessageEnvelope {
    MessageEnvelope::new(
        SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
        CausalContext::independent(),
        payload,
    )
}

fn read_payloads(
    store: &dyn DurableStore,
    stream_key: &str,
) -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>> {
    // Outer `?` surfaces a bridge timeout; inner `?` surfaces a store error.
    let entries: Vec<StoredEntry> = block_on(store.read_from(stream_key, 0, 1024))??;
    let mut payloads = Vec::with_capacity(entries.len());
    for entry in entries {
        // The durable channel stores the serialized durability envelope; the
        // original application payload is recovered by deserializing it.
        payloads.push(DurableEnvelope::deserialize(&entry.payload)?.payload);
    }
    Ok(payloads)
}

/// Append-failure strategy for [`FailingAppendStore`].
enum FailMode {
    /// Fail every append whose stream key matches the predicate.
    Predicate(fn(&str) -> bool),
    /// Fail the durable channel append unconditionally, and fail every dedup-stream
    /// append EXCEPT the first one per stream. This lets the dedup CLAIM (first
    /// dedup-stream append) succeed so the publish path reaches the channel persist,
    /// which fails (the ORIGINAL error), and then the dedup RELEASE tombstone
    /// (second dedup-stream append) also fails (so `release_claim` ITSELF errors).
    ChannelAlwaysDedupAfterFirst(Mutex<HashMap<String, u32>>),
}

/// `DurableStore` decorator that injects append failures for testing the publish
/// failure path. While armed, an `append` selected by the [`FailMode`] returns a
/// store error; every other operation delegates to the inner store.
/// [`Self::clear_failure`] disarms the injection so a retry can succeed.
#[derive(Debug)]
struct FailingAppendStore {
    inner: Arc<dyn DurableStore>,
    armed: AtomicBool,
    mode: FailMode,
}

impl std::fmt::Debug for FailMode {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Predicate(_) => formatter.write_str("Predicate"),
            Self::ChannelAlwaysDedupAfterFirst(_) => {
                formatter.write_str("ChannelAlwaysDedupAfterFirst")
            }
        }
    }
}

impl FailingAppendStore {
    fn new(inner: Arc<dyn DurableStore>, should_fail: fn(&str) -> bool) -> Self {
        Self {
            inner,
            armed: AtomicBool::new(true),
            mode: FailMode::Predicate(should_fail),
        }
    }

    fn fail_after_first_per_stream(inner: Arc<dyn DurableStore>) -> Self {
        Self {
            inner,
            armed: AtomicBool::new(true),
            mode: FailMode::ChannelAlwaysDedupAfterFirst(Mutex::new(HashMap::new())),
        }
    }

    fn clear_failure(&self) {
        self.armed.store(false, Ordering::SeqCst);
    }

    fn should_fail(&self, stream_key: &str) -> Result<bool, DurabilityError> {
        if !self.armed.load(Ordering::SeqCst) {
            return Ok(false);
        }
        match &self.mode {
            FailMode::Predicate(predicate) => Ok(predicate(stream_key)),
            FailMode::ChannelAlwaysDedupAfterFirst(seen) => {
                if stream_key == ORDERS_STREAM_KEY {
                    return Ok(true);
                }
                let mut seen = seen
                    .lock()
                    .map_err(|_| DurabilityError::ConfigError("test lock poisoned".to_owned()))?;
                let count = seen.entry(stream_key.to_owned()).or_insert(0);
                let fail = *count > 0;
                *count += 1;
                drop(seen);
                Ok(fail)
            }
        }
    }
}

#[async_trait::async_trait]
impl DurableStore for FailingAppendStore {
    async fn append(
        &self,
        stream_key: &str,
        payload: Vec<u8>,
        expected_seq: u64,
    ) -> Result<u64, DurabilityError> {
        if self.should_fail(stream_key)? {
            return Err(DurabilityError::ConfigError(format!(
                "injected append failure for stream '{stream_key}'"
            )));
        }
        self.inner.append(stream_key, payload, expected_seq).await
    }

    async fn read_from(
        &self,
        stream_key: &str,
        offset: u64,
        limit: usize,
    ) -> Result<Vec<StoredEntry>, DurabilityError> {
        self.inner.read_from(stream_key, offset, limit).await
    }

    async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError> {
        self.inner.cas(key, old_value, new_value).await
    }

    async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError> {
        self.inner.read_value(key).await
    }

    async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
        self.inner.scan(prefix).await
    }

    async fn flush(&self) -> Result<(), DurabilityError> {
        self.inner.flush().await
    }
}