jetstream 16.0.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
// JetStream RPC — Server Loop
// Copyright (c) 2024, Sevki <s@sevki.io>
// SPDX-License-Identifier: BSD-3-Clause

// r[impl jetstream.rpc.swift.server-loop]

import Foundation
import JetStreamWireFormat

/// A handler dispatch function that processes a request frame and returns a
/// response frame. Generated by the codegen for each service protocol.
public typealias DispatchFn<Req: Framer, Res: Framer> =
    @Sendable (Context, Frame<Req>) async -> Frame<Res>

/// Run the server loop on a single transport stream.
///
/// Reads request frames from the transport's `AsyncSequence`, dispatches each
/// to the handler's `dispatch` function concurrently via `TaskGroup`, and
/// writes response frames via the transport's `send` method as they complete.
public func serve<T: Transport>(
    transport: T,
    ctx: Context,
    dispatch: @escaping DispatchFn<T.TRes, T.TReq>
) async throws where T.TReq: Framer, T.TRes: Framer {
    try await withThrowingTaskGroup(of: Void.self) { group in
        for try await requestFrame in transport.receive() {
            group.addTask {
                let responseFrame = await dispatch(ctx, requestFrame)
                try await transport.send(responseFrame)
            }
        }
        // Wait for all in-flight dispatches to complete after the stream ends.
        try await group.waitForAll()
    }
}

// r[impl jetstream.rpc.swift.server-loop.transport]

/// Accept incoming bidirectional streams and run a server loop on each.
///
/// `acceptBi` is called repeatedly; each accepted stream is served
/// concurrently in its own child task. On transports that provide only a
/// single connection stream, pass the stream directly to `serve()` instead.
public func acceptAndServe<T: Transport>(
    acceptBi: @escaping () async throws -> T?,
    ctx: Context,
    dispatch: @escaping DispatchFn<T.TRes, T.TReq>
) async throws where T.TReq: Framer, T.TRes: Framer {
    try await withThrowingTaskGroup(of: Void.self) { group in
        while let stream = try await acceptBi() {
            group.addTask {
                try await serve(transport: stream, ctx: ctx, dispatch: dispatch)
            }
        }
    }
}