durable-streams-server 0.2.0

Durable Streams protocol server in Rust, built with axum and tokio
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
mod common;

use common::{read_problem, spawn_test_server, test_client, unique_stream_name};

/// Validates spec: 02-append-semantics.md#append-data
///
/// Verifies that POST appends data and returns 204 with Stream-Next-Offset.
#[tokio::test]
async fn test_append_returns_204() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Append data
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("Hello, world!")
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 204, "Expected 204 No Content");

    // Check Stream-Next-Offset header
    let next_offset = response
        .headers()
        .get("Stream-Next-Offset")
        .expect("Missing Stream-Next-Offset header")
        .to_str()
        .unwrap();

    // After appending 13 bytes, next offset should be 0000000000000001_000000000000000d
    assert_eq!(next_offset, "0000000000000001_000000000000000d");

    // Body should be empty
    let body = response.bytes().await.unwrap();
    assert!(body.is_empty(), "POST response should have no body");
}

/// Validates spec: 02-append-semantics.md#append-data
///
/// Verifies that multiple appends generate monotonic offsets.
#[tokio::test]
async fn test_multiple_appends_monotonic_offsets() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // First append (5 bytes: "first")
    let response1 = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("first")
        .send()
        .await
        .unwrap();
    let offset1 = response1
        .headers()
        .get("Stream-Next-Offset")
        .unwrap()
        .to_str()
        .unwrap();
    assert_eq!(offset1, "0000000000000001_0000000000000005");

    // Second append (6 bytes: "second")
    let response2 = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("second")
        .send()
        .await
        .unwrap();
    let offset2 = response2
        .headers()
        .get("Stream-Next-Offset")
        .unwrap()
        .to_str()
        .unwrap();
    assert_eq!(offset2, "0000000000000002_000000000000000b");

    // Third append (5 bytes: "third")
    let response3 = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("third")
        .send()
        .await
        .unwrap();
    let offset3 = response3
        .headers()
        .get("Stream-Next-Offset")
        .unwrap()
        .to_str()
        .unwrap();
    assert_eq!(offset3, "0000000000000003_0000000000000010");
}

/// Validates spec: 02-append-semantics.md#content-type-validation
///
/// Verifies that content-type mismatch returns 409.
#[tokio::test]
async fn test_append_content_type_mismatch_returns_409() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream with text/plain
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Try to append with application/json
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "application/json")
        .body(r#"{"key": "value"}"#)
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 409, "Expected 409 Conflict");
}

/// Validates spec: 02-append-semantics.md#content-type-validation
///
/// Verifies that content-type comparison is case-insensitive.
#[tokio::test]
async fn test_append_content_type_case_insensitive() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream with lowercase
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Append with uppercase
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "TEXT/PLAIN")
        .body("data")
        .send()
        .await
        .unwrap();

    assert_eq!(
        response.status(),
        204,
        "Expected 204 for case-insensitive match"
    );
}

/// Validates spec: 02-append-semantics.md#content-type-validation
///
/// Verifies that charset parameter is ignored.
#[tokio::test]
async fn test_append_content_type_ignores_charset() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream without charset
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Append with charset
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain; charset=utf-8")
        .body("data")
        .send()
        .await
        .unwrap();

    assert_eq!(
        response.status(),
        204,
        "Expected 204 when charset is ignored"
    );
}

/// Validates spec: 02-append-semantics.md#append-data
///
/// Verifies that empty body without Stream-Closed returns 400.
#[tokio::test]
async fn test_append_empty_body_without_closed_returns_400() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Try to append empty body without Stream-Closed
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    assert_eq!(
        response.status(),
        400,
        "Expected 400 for empty body without Stream-Closed"
    );
}

/// Validates spec: 02-append-semantics.md#stream-closure
///
/// Verifies that Stream-Closed: true closes the stream.
#[tokio::test]
async fn test_close_stream_without_data() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Close stream without data
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .header("Stream-Closed", "true")
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 204, "Expected 204 for close operation");

    // Verify stream is closed via HEAD
    let head_response = client
        .head(format!("{base_url}/v1/stream/{stream_name}"))
        .send()
        .await
        .unwrap();

    let closed = head_response
        .headers()
        .get("Stream-Closed")
        .expect("Missing Stream-Closed header")
        .to_str()
        .unwrap();
    assert_eq!(closed, "true");
}

/// Validates spec: 02-append-semantics.md#stream-closure
///
/// Verifies that closing with data appends and then closes.
#[tokio::test]
async fn test_close_stream_with_data() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Close stream with final message
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .header("Stream-Closed", "true")
        .body("goodbye")
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 204, "Expected 204 for close with data");

    let next_offset = response
        .headers()
        .get("Stream-Next-Offset")
        .unwrap()
        .to_str()
        .unwrap();

    // After appending 7 bytes, next offset should be after the message
    assert_eq!(next_offset, "0000000000000001_0000000000000007");
}

/// Validates spec: 02-append-semantics.md#behavior-after-closure
///
/// Verifies that appending to closed stream returns 409.
#[tokio::test]
async fn test_append_to_closed_stream_returns_409() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Create stream
    client
        .put(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .send()
        .await
        .unwrap();

    // Close stream
    client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .header("Stream-Closed", "true")
        .send()
        .await
        .unwrap();

    // Try to append to closed stream
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("more data")
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 409, "Expected 409 Conflict");

    // Verify Stream-Closed header in error response
    let closed = response
        .headers()
        .get("Stream-Closed")
        .expect("Missing Stream-Closed header in error")
        .to_str()
        .unwrap();
    assert_eq!(closed, "true");

    let problem = read_problem(response).await;
    let instance = format!("/v1/stream/{stream_name}");
    assert_eq!(problem.problem_type, "/errors/stream-closed");
    assert_eq!(problem.title, "Stream Closed");
    assert_eq!(problem.status, 409);
    assert_eq!(problem.code, "STREAM_CLOSED");
    assert_eq!(problem.instance.as_deref(), Some(instance.as_str()));
}

/// Validates spec: 02-append-semantics.md#append-data
///
/// Verifies that appending to non-existent stream returns 404.
#[tokio::test]
async fn test_append_to_nonexistent_stream_returns_404() {
    let (base_url, _port) = spawn_test_server().await;
    let client = test_client();
    let stream_name = unique_stream_name();

    // Try to append without creating stream
    let response = client
        .post(format!("{base_url}/v1/stream/{stream_name}"))
        .header("Content-Type", "text/plain")
        .body("data")
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 404, "Expected 404 Not Found");

    let problem = read_problem(response).await;
    let instance = format!("/v1/stream/{stream_name}");
    assert_eq!(problem.problem_type, "/errors/not-found");
    assert_eq!(problem.title, "Stream Not Found");
    assert_eq!(problem.status, 404);
    assert_eq!(problem.code, "NOT_FOUND");
    assert_eq!(problem.instance.as_deref(), Some(instance.as_str()));
}