jetstream 16.0.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
// JetStream RPC — Multiplexer
// Copyright (c) 2024, Sevki <s@sevki.io>
// SPDX-License-Identifier: BSD-3-Clause

// r[impl jetstream.rpc.swift.mux]

import Foundation
import JetStreamWireFormat

/// Client multiplexer for concurrent RPC calls.
public actor Mux<TReq: Framer, TRes: Framer> {
    private let tagPool: TagPool
    private var pending: [UInt16: CheckedContinuation<Frame<TRes>, Error>] = [:]
    private let transport: any Transport<TReq, TRes>

    public init(transport: any Transport<TReq, TRes>, maxConcurrent: UInt16 = 256) {
        self.tagPool = TagPool(maxConcurrent: maxConcurrent)
        self.transport = transport
    }

    /// Start the demux loop (call once).
    public func start() async {
        Task {
            for try await frame in transport.receive() {
                self.dispatch(frame)
            }
        }
    }

    private func dispatch(_ frame: Frame<TRes>) {
        if let continuation = pending.removeValue(forKey: frame.tag) {
            Task { await tagPool.release(frame.tag) }
            continuation.resume(returning: frame)
        }
    }

    /// Send a request and await the response.
    public func rpc(_ msg: TReq) async throws -> Frame<TRes> {
        guard let tag = await tagPool.acquire() else {
            throw FrameError.frameTooSmall(0) // no tags available
        }

        return try await withCheckedThrowingContinuation { continuation in
            pending[tag] = continuation
            Task {
                do {
                    try await transport.send(Frame(tag: tag, msg: msg))
                } catch {
                    if let cont = self.pending.removeValue(forKey: tag) {
                        Task { await self.tagPool.release(tag) }
                        cont.resume(throwing: error)
                    }
                }
            }
        }
    }

    public func close() async throws {
        try await transport.close()
    }
}