jetstream 16.0.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
// JetStream RPC — Server Codec
// Copyright (c) 2024, Sevki <s@sevki.io>
// SPDX-License-Identifier: BSD-3-Clause

// r[impl jetstream.rpc.swift.server-codec]

import Foundation
import JetStreamWireFormat

/// Decodes incoming bytes into `Frame<P.Request>` and encodes outgoing
/// `Frame<P.Response>` for writing. Reads the 4-byte `size` prefix to
/// determine frame boundaries, then decodes the full frame.
public struct ServerCodec<P: JetStreamProtocol>: Sendable {
    public init() {}

    /// Decode request frames from an `AsyncSequence` of raw `Data` chunks.
    ///
    /// Accumulates incoming data and yields complete `Frame<P.Request>` values
    /// as soon as enough bytes have arrived (determined by the 4-byte LE size
    /// prefix).
    public func decodeFrames<S: AsyncSequence>(
        from source: S
    ) -> AsyncThrowingStream<Frame<P.Request>, Error> where S.Element == Data {
        AsyncThrowingStream { continuation in
            let task = Task {
                var buffer = Data()
                do {
                    for try await chunk in source {
                        buffer.append(chunk)
                        // Drain as many complete frames as possible.
                        while buffer.count >= 4 {
                            let size: UInt32 = buffer.withUnsafeBytes {
                                $0.loadUnaligned(as: UInt32.self)
                            }
                            let frameSize = Int(UInt32(littleEndian: size))
                            guard frameSize >= 7 else {
                                continuation.finish(throwing: FrameError.frameTooSmall(UInt32(frameSize)))
                                return
                            }
                            guard buffer.count >= frameSize else { break }
                            let frameData = buffer.prefix(frameSize)
                            var reader = BinaryReader(data: frameData)
                            let frame = try Frame<P.Request>.decode(reader: &reader)
                            continuation.yield(frame)
                            buffer = Data(buffer.dropFirst(frameSize))
                        }
                    }
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
            continuation.onTermination = { _ in task.cancel() }
        }
    }

    /// Encode a response frame into raw bytes suitable for writing.
    public func encode(_ frame: Frame<P.Response>) throws -> Data {
        var writer = BinaryWriter(capacity: Int(frame.byteSize()))
        try frame.encode(writer: &writer)
        return writer.data
    }
}