// JetStream RPC — Server Loop
// Copyright (c) 2024, Sevki <s@sevki.io>
// SPDX-License-Identifier: BSD-3-Clause
// r[impl jetstream.rpc.swift.server-loop]
import Foundation
import JetStreamWireFormat
/// A handler dispatch function that processes a request frame and returns a
/// response frame. Generated by the codegen for each service protocol.
public typealias DispatchFn<Req: Framer, Res: Framer> =
@Sendable (Context, Frame<Req>) async -> Frame<Res>
/// Run the server loop on a single transport stream.
///
/// Reads request frames from the transport's `AsyncSequence`, dispatches each
/// to the handler's `dispatch` function concurrently via `TaskGroup`, and
/// writes response frames via the transport's `send` method as they complete.
public func serve<T: Transport>(
transport: T,
ctx: Context,
dispatch: @escaping DispatchFn<T.TRes, T.TReq>
) async throws where T.TReq: Framer, T.TRes: Framer {
try await withThrowingTaskGroup(of: Void.self) { group in
for try await requestFrame in transport.receive() {
group.addTask {
let responseFrame = await dispatch(ctx, requestFrame)
try await transport.send(responseFrame)
}
}
// Wait for all in-flight dispatches to complete after the stream ends.
try await group.waitForAll()
}
}
// r[impl jetstream.rpc.swift.server-loop.transport]
/// Accept incoming bidirectional streams and run a server loop on each.
///
/// `acceptBi` is called repeatedly; each accepted stream is served
/// concurrently in its own child task. On transports that provide only a
/// single connection stream, pass the stream directly to `serve()` instead.
public func acceptAndServe<T: Transport>(
acceptBi: @escaping () async throws -> T?,
ctx: Context,
dispatch: @escaping DispatchFn<T.TRes, T.TReq>
) async throws where T.TReq: Framer, T.TRes: Framer {
try await withThrowingTaskGroup(of: Void.self) { group in
while let stream = try await acceptBi() {
group.addTask {
try await serve(transport: stream, ctx: ctx, dispatch: dispatch)
}
}
}
}