inferd-daemon 0.2.1

The inferd daemon: NDJSON-over-IPC server, admission queue, single-instance lock, router, activity log.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
//! Daemon lifecycle: boot → wait-for-ready → bind listener → accept →
//! dispatch → shutdown.
//!
//! The M1 lifecycle wires:
//! - `lock` — single-instance lock at startup (THREAT_MODEL F-2).
//! - `router` — backend selection (no-op v0.1 — picks the only one).
//! - `endpoint` — listener bound only after `router.all_ready()`
//!   (THREAT_MODEL F-13).
//! - `queue` — admission gate (`SubmitError::QueueFull` → wire
//!   `code: queue_full`).
//! - `inferd-proto` — frame parsing and serialisation.
//!
//! Cancellation: dropping a connection drops the in-flight `TokenStream`,
//! which closes the engine's `tx` and stops the spawned generation task.
//! Per ADR 0007 the daemon emits no terminal frame on cancel — the EOF
//! is the signal.

use crate::auth::{AuthFrame, key_matches};
use crate::endpoint::Connection;
use crate::peercred::PeerIdentity;
use crate::queue::{Admission, SubmitError};
use crate::router::{Router, RouterError};
use inferd_engine::{GenerateError, TokenEvent};
use inferd_proto::{ErrorCode, ProtoError, Request, Response, write_frame};
use std::io;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncWrite, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tracing::{debug, info, warn};

/// Wait until every backend in `router` reports ready, polling at 50ms
/// intervals up to `timeout`. Returns the duration spent waiting.
///
/// THREAT_MODEL F-13: nothing else creates listeners until this returns.
pub async fn wait_for_ready(router: &Router, timeout: Duration) -> Result<Duration, ReadyTimeout> {
    let started = Instant::now();
    let poll = Duration::from_millis(50);
    loop {
        if router.all_ready() {
            return Ok(started.elapsed());
        }
        if started.elapsed() >= timeout {
            return Err(ReadyTimeout(timeout));
        }
        tokio::time::sleep(poll).await;
    }
}

/// Returned when `wait_for_ready` exhausts its budget without seeing
/// readiness across every backend.
#[derive(Debug, thiserror::Error)]
#[error("backend not ready within {0:?}")]
pub struct ReadyTimeout(pub Duration);

/// Per-accept context that the lifecycle hands to every spawned
/// connection task.
///
/// Today it carries the optional TCP API key (THREAT_MODEL F-8) and
/// the shared admission gate (queue_full enforcement). New per-
/// connection policy (rate limits, per-caller quotas) extends this
/// struct rather than each `serve_*` signature.
#[derive(Clone, Default)]
pub struct AcceptContext {
    /// When `Some` and the connection is TCP, the daemon requires an
    /// auth frame as the first NDJSON line on the wire and constant-
    /// time-compares the key against this value. UDS / pipe ignore
    /// this field — F-7 covers them.
    pub expected_api_key: Option<String>,
    /// Shared admission gate. `None` for tests / dev paths that
    /// don't care about queue depth — those treat every request
    /// as admitted. Production lifecycle always passes `Some`.
    pub admission: Option<Admission>,
}

impl std::fmt::Debug for AcceptContext {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AcceptContext")
            .field("expected_api_key", &self.expected_api_key.is_some())
            .field(
                "admission_capacity",
                &self.admission.as_ref().map(|a| a.capacity()),
            )
            .finish()
    }
}

/// Handle one accepted client connection: read framed `Request`s and write
/// framed `Response`s until EOF or fatal error.
///
/// Per request:
/// 1. Read one frame (`read_frame`).
/// 2. `Request::resolve()` — apply defaults, validate. Failures → `error`
///    frame with `code: invalid_request`.
/// 3. `router.dispatch()` — pick a backend.
/// 4. `backend.generate()` — pre-stream errors → `error` frame with
///    `code: backend_unavailable`.
/// 5. Stream `TokenEvent`s, translating each to `Response::Token` /
///    `Response::Done`. If the engine drops the stream without `Done`,
///    emit `error` with `code: backend_unavailable`.
pub async fn handle_connection<C: Connection + 'static>(
    mut conn: C,
    router: Arc<Router>,
    peer: PeerIdentity,
    ctx: AcceptContext,
) -> Result<(), io::Error> {
    let transport = conn.transport();
    info!(
        target: "inferd_daemon::activity",
        transport = transport,
        peer = %peer,
        peer_uid = peer.uid,
        peer_pid = peer.pid,
        peer_sid = peer.sid.as_deref(),
        "connection_accepted"
    );

    // Split read and write halves so the generation task can write tokens
    // while we keep reading the next request. We don't actually pipeline
    // requests in M1 (admission queue is 1-active anyway), but the split
    // is needed because tokio AsyncWrite is consumed by `write_all`.
    let (read_half, write_half) = tokio::io::split(&mut conn);
    let mut reader = BufReader::with_capacity(64 * 1024, read_half);
    let writer = Arc::new(Mutex::new(write_half));

    // F-8: TCP first-frame auth. UDS / pipe rely on F-7 peer creds and
    // skip this. Anonymous probers see the connection close with no
    // protocol error frame — we don't confirm endpoint existence.
    if transport == "tcp"
        && let Some(expected) = ctx.expected_api_key.as_deref()
    {
        match read_auth_frame(&mut reader).await {
            Some(frame) if key_matches(&frame.key, expected) => {
                debug!(transport, "tcp auth ok");
            }
            _ => {
                warn!(
                    target: "inferd_daemon::activity",
                    peer = %peer,
                    "tcp_auth_rejected"
                );
                return Ok(());
            }
        }
    }

    loop {
        // Read one request frame. `read_frame` is sync over a sync BufRead;
        // we have an async reader, so do a small async-to-sync bridge by
        // first reading into a vec and then parsing.
        let request: Request = match read_frame_async(&mut reader).await {
            Ok(Some(r)) => r,
            Ok(None) => return Ok(()), // peer closed cleanly
            Err(ProtoError::Io(e)) => return Err(e),
            Err(e) => {
                let resp = Response::Error {
                    id: String::new(),
                    code: e.to_error_code(),
                    message: e.to_string(),
                };
                write_response(&writer, &resp).await?;
                return Ok(());
            }
        };

        // Resolve: defaults + validation.
        let id = request.id.clone();
        let resolved = match request.resolve() {
            Ok(r) => r,
            Err(e) => {
                let resp = Response::Error {
                    id,
                    code: ErrorCode::InvalidRequest,
                    message: e.to_string(),
                };
                write_response(&writer, &resp).await?;
                continue;
            }
        };

        // Admission gate (queue_full enforcement). Held for the full
        // generation; dropping the permit (after the Done frame, on
        // mid-stream error, or on connection drop) returns the slot
        // to the pool. `None` admission = tests / dev paths that
        // don't care about queue depth.
        let _admit_permit = match ctx.admission.as_ref().map(|a| a.try_admit()) {
            None => None,
            Some(Ok(p)) => Some(p),
            Some(Err(SubmitError::QueueFull)) => {
                let resp = Response::Error {
                    id: resolved.id.clone(),
                    code: ErrorCode::QueueFull,
                    message: "queue full".into(),
                };
                write_response(&writer, &resp).await?;
                continue;
            }
            Some(Err(SubmitError::Closed)) => {
                // Admission closed = daemon shutting down. Tell the
                // caller, then drop the connection — there's no point
                // reading another request that we'll also reject.
                let resp = Response::Error {
                    id: resolved.id.clone(),
                    code: ErrorCode::BackendUnavailable,
                    message: "admission closed".into(),
                };
                write_response(&writer, &resp).await?;
                return Ok(());
            }
        };

        // Dispatch through the router.
        let dispatch = match router.dispatch() {
            Ok(d) => d,
            Err(RouterError::NoBackends) | Err(RouterError::NoneAvailable) => {
                let resp = Response::Error {
                    id: resolved.id.clone(),
                    code: ErrorCode::BackendUnavailable,
                    message: "no backend available".into(),
                };
                write_response(&writer, &resp).await?;
                continue;
            }
        };
        let backend_name = dispatch.name.clone();
        let backend = dispatch.backend;
        let req_id = resolved.id.clone();

        // Generate. Pre-stream errors count toward the breaker per
        // ADR 0007 — InvalidRequest does not (it's a caller bug, not
        // a backend health signal).
        let mut stream = match backend.generate(resolved).await {
            Ok(s) => s,
            Err(e) => {
                let (code, message, is_backend_failure) = match e {
                    GenerateError::InvalidRequest(m) => (ErrorCode::InvalidRequest, m, false),
                    GenerateError::NotReady => (
                        ErrorCode::BackendUnavailable,
                        "backend not ready".into(),
                        true,
                    ),
                    GenerateError::Unavailable(m) => (ErrorCode::BackendUnavailable, m, true),
                    GenerateError::Internal(m) => (ErrorCode::Internal, m, true),
                };
                if is_backend_failure {
                    router.record_failure(&backend_name);
                }
                let resp = Response::Error {
                    id: req_id,
                    code,
                    message,
                };
                write_response(&writer, &resp).await?;
                continue;
            }
        };

        // Stream tokens. Build the full content for Response::Done in one
        // pass; the engine reports usage so we don't have to count.
        let mut full = String::new();
        let mut terminal_emitted = false;
        while let Some(ev) = stream.next().await {
            match ev {
                TokenEvent::Token(text) => {
                    let frame = Response::Token {
                        id: req_id.clone(),
                        content: text.clone(),
                    };
                    write_response(&writer, &frame).await?;
                    full.push_str(&text);
                }
                TokenEvent::Done { stop_reason, usage } => {
                    let frame = Response::Done {
                        id: req_id.clone(),
                        content: std::mem::take(&mut full),
                        usage,
                        stop_reason,
                        backend: backend_name.clone(),
                    };
                    write_response(&writer, &frame).await?;
                    info!(
                        target: "inferd_daemon::activity",
                        req_id = %req_id,
                        backend = %backend_name,
                        stop_reason = ?stop_reason,
                        prompt_tokens = usage.prompt_tokens,
                        completion_tokens = usage.completion_tokens,
                        "request_done"
                    );
                    router.record_success(&backend_name);
                    terminal_emitted = true;
                    break;
                }
            }
        }

        if !terminal_emitted {
            // Mid-stream backend failure (no Done event). Report and move
            // to next request on the same connection. Counts toward the
            // breaker per ADR 0007.
            warn!(
                target: "inferd_daemon::activity",
                req_id = %req_id,
                backend = %backend_name,
                "request_error_mid_stream"
            );
            router.record_failure(&backend_name);
            let frame = Response::Error {
                id: req_id,
                code: ErrorCode::BackendUnavailable,
                message: "backend ended stream without terminal frame".into(),
            };
            write_response(&writer, &frame).await?;
        }
    }
}

/// Read one NDJSON line from a tokio `BufRead` and parse it as an
/// `AuthFrame`. Returns `None` on any failure (truncation, garbage,
/// wrong type) so the caller can close the connection silently.
///
/// Takes the *existing* BufReader the connection handler already
/// owns — wrapping it in a second BufReader would buffer bytes
/// past the auth line that then get lost when the local wrapper drops.
async fn read_auth_frame<R>(reader: &mut R) -> Option<AuthFrame>
where
    R: tokio::io::AsyncBufRead + Unpin,
{
    use tokio::io::AsyncBufReadExt;
    let mut line = Vec::with_capacity(256);
    let limit = inferd_proto::MAX_FRAME_BYTES;
    loop {
        let buf = reader.fill_buf().await.ok()?;
        if buf.is_empty() {
            return None;
        }
        if let Some(idx) = buf.iter().position(|&b| b == b'\n') {
            if line.len() + idx > limit {
                return None;
            }
            line.extend_from_slice(&buf[..idx]);
            reader.consume(idx + 1);
            return AuthFrame::from_json(&line);
        }
        if line.len() + buf.len() > limit {
            return None;
        }
        line.extend_from_slice(buf);
        let n = buf.len();
        reader.consume(n);
    }
}

/// Async wrapper around `inferd_proto::read_frame` for tokio readers.
///
/// Consumes from an existing `AsyncBufRead` (typically the per-connection
/// `BufReader` that the lifecycle holds) so any bytes prefetched past the
/// current line stay in the caller's buffer for the next read. Wrapping
/// the input in a *second* BufReader here would lose those bytes when
/// the local wrapper dropped.
async fn read_frame_async<R>(reader: &mut R) -> Result<Option<Request>, ProtoError>
where
    R: tokio::io::AsyncBufRead + Unpin,
{
    use tokio::io::AsyncBufReadExt;
    let mut line = Vec::with_capacity(512);
    let limit = inferd_proto::MAX_FRAME_BYTES;
    loop {
        let buf = reader.fill_buf().await?;
        if buf.is_empty() {
            if line.is_empty() {
                return Ok(None);
            }
            // Trailing line without newline. Defer to the proto crate's
            // sync reader, which handles trailing-line-without-newline as
            // a final frame.
            return inferd_proto::read_frame::<&[u8], Request>(&mut &line[..]);
        }
        if let Some(idx) = buf.iter().position(|&b| b == b'\n') {
            if line.len() + idx > limit {
                return Err(ProtoError::FrameTooLarge);
            }
            line.extend_from_slice(&buf[..=idx]);
            reader.consume(idx + 1);
            return inferd_proto::read_frame::<&[u8], Request>(&mut &line[..]);
        }
        if line.len() + buf.len() > limit {
            return Err(ProtoError::FrameTooLarge);
        }
        line.extend_from_slice(buf);
        let n = buf.len();
        reader.consume(n);
    }
}

async fn write_response<W: AsyncWrite + Unpin>(
    writer: &Mutex<W>,
    resp: &Response,
) -> io::Result<()> {
    let mut buf = Vec::with_capacity(512);
    write_frame(&mut buf, resp)
        .map_err(|e| io::Error::other(format!("serialise response: {e}")))?;
    let mut guard = writer.lock().await;
    guard.write_all(&buf).await?;
    guard.flush().await?;
    Ok(())
}

/// Serve a TCP listener: accept loop, spawn one task per connection.
///
/// Returns when `shutdown` resolves (e.g. a Ctrl-C signal). All in-flight
/// connections are dropped at that point — clients see EOF and treat it as
/// a non-terminal-frame error per `docs/protocol-v1.md`.
pub async fn serve_tcp(
    listener: tokio::net::TcpListener,
    router: Arc<Router>,
    ctx: AcceptContext,
    mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> io::Result<()> {
    info!(addr = ?listener.local_addr()?, "tcp listener accepting");
    loop {
        tokio::select! {
            _ = &mut shutdown => {
                info!("shutdown signalled");
                return Ok(());
            }
            accept = listener.accept() => {
                let (stream, peer_addr) = accept?;
                let r = Arc::clone(&router);
                let peer = PeerIdentity::from_tcp(peer_addr);
                let ctx = ctx.clone();
                debug!(?peer_addr, "tcp accept");
                tokio::spawn(async move {
                    if let Err(e) = handle_connection(stream, r, peer, ctx).await {
                        warn!(error = ?e, "connection terminated with error");
                    }
                });
            }
        }
    }
}

/// Serve a Unix domain socket listener (Unix only).
#[cfg(unix)]
pub async fn serve_uds(
    listener: tokio::net::UnixListener,
    router: Arc<Router>,
    ctx: AcceptContext,
    mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> io::Result<()> {
    info!("uds listener accepting");
    loop {
        tokio::select! {
            _ = &mut shutdown => {
                info!("shutdown signalled");
                return Ok(());
            }
            accept = listener.accept() => {
                let (stream, _) = accept?;
                let r = Arc::clone(&router);
                // Best-effort SO_PEERCRED. If the OS refuses (very rare on
                // a connected UDS), record an empty identity but still
                // serve the request — the socket ACL was the primary
                // perimeter; this is defence in depth.
                let peer = crate::peercred::unix::from_stream(&stream)
                    .unwrap_or_else(|e| {
                        warn!(error = %e, "SO_PEERCRED failed; recording empty unix identity");
                        crate::peercred::PeerIdentity {
                            uid: None, gid: None, pid: None,
                            sid: None, remote_addr: None,
                            transport: "unix",
                        }
                    });
                let ctx = ctx.clone();
                debug!(?peer, "uds accept");
                tokio::spawn(async move {
                    if let Err(e) = handle_connection(stream, r, peer, ctx).await {
                        warn!(error = ?e, "connection terminated with error");
                    }
                });
            }
        }
    }
}

/// Serve a Windows named pipe (Windows only).
///
/// Caller must bind the first instance via
/// [`crate::endpoint::bind_named_pipe(path, true)`] and pass it in via
/// `first_instance`. This split ensures the listener exists before the
/// caller (or a test harness) hands the path out — eliminates the race
/// where a client connects between `tokio::spawn(serve_named_pipe)` and
/// the first `bind_named_pipe` call inside the loop.
///
/// Loop:
/// 1. Await `server.connect()` — accept point.
/// 2. Hand the connected server to a per-connection task.
/// 3. Bind the next server instance (`first = false`) so the next
///    client can connect immediately.
///
/// Loops until `shutdown` resolves.
#[cfg(windows)]
pub async fn serve_named_pipe(
    path: &str,
    first_instance: tokio::net::windows::named_pipe::NamedPipeServer,
    router: Arc<Router>,
    ctx: AcceptContext,
    mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> io::Result<()> {
    use crate::endpoint::bind_named_pipe;

    info!(path = %path, "named pipe listener accepting");
    let mut server = first_instance;
    loop {
        tokio::select! {
            _ = &mut shutdown => {
                info!("shutdown signalled");
                return Ok(());
            }
            connect_result = server.connect() => {
                connect_result?;
                // Take ownership of the connected server; build the next
                // listening instance before spawning the handler so a
                // second client can connect immediately.
                let connected = server;
                server = bind_named_pipe(path, false)?;

                // Best-effort peer identity. If the lookup fails (caller
                // process exited between accept and probe), serve with
                // an empty identity; named-pipe DACL is the primary
                // perimeter, this is defence in depth.
                let peer = crate::peercred::windows::from_stream(&connected)
                    .unwrap_or_else(|e| {
                        warn!(error = %e, "GetNamedPipeClientProcessId failed; empty pipe identity");
                        crate::peercred::PeerIdentity {
                            uid: None, gid: None, pid: None,
                            sid: None, remote_addr: None,
                            transport: "pipe",
                        }
                    });
                let r = Arc::clone(&router);
                let ctx = ctx.clone();
                debug!(?peer, "named pipe accept");
                tokio::spawn(async move {
                    if let Err(e) = handle_connection(connected, r, peer, ctx).await {
                        warn!(error = ?e, "connection terminated with error");
                    }
                });
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use inferd_engine::mock::Mock;

    #[tokio::test]
    async fn wait_for_ready_returns_when_already_ready() {
        let router = Router::new(vec![Arc::new(Mock::new())]);
        let elapsed = wait_for_ready(&router, Duration::from_secs(1))
            .await
            .unwrap();
        assert!(elapsed < Duration::from_millis(100));
    }

    #[tokio::test]
    async fn wait_for_ready_times_out_when_not_ready() {
        let mock = Arc::new(Mock::new());
        mock.set_ready(false);
        let router = Router::new(vec![mock]);
        let err = wait_for_ready(&router, Duration::from_millis(100))
            .await
            .unwrap_err();
        assert!(err.to_string().contains("not ready"));
    }

    #[tokio::test]
    async fn wait_for_ready_succeeds_after_delayed_ready() {
        let mock = Arc::new(Mock::new());
        mock.set_ready(false);
        let router = Router::new(vec![mock.clone()]);

        let m2 = Arc::clone(&mock);
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(150)).await;
            m2.set_ready(true);
        });

        let elapsed = wait_for_ready(&router, Duration::from_secs(1))
            .await
            .unwrap();
        assert!(elapsed >= Duration::from_millis(100));
    }
}