import Foundation
{%- if let Some(ffi_module) = ffi_module_name %}
import {{ ffi_module }}
typealias BoltFFICallbackHandle = {{ ffi_module }}.BoltFFICallbackHandle
{%- endif %}
public struct FfiError: Error {
public let message: String
public init(message: String) { self.message = message }
{%- if let Some(ffi_module) = ffi_module_name %}
public init(fromC c: {{ ffi_module }}.FfiError) {
self.message = stringFromFfi(c.message)
}
{%- endif %}
}
@inline(__always)
private func stringFromFfi(_ ffiString: FfiString) -> String {
guard ffiString.len > 0, let pointer = ffiString.ptr else { return "" }
return String(decoding: UnsafeBufferPointer(start: pointer, count: Int(ffiString.len)), as: UTF8.self)
}
@inline(__always)
private func takeLastErrorMessage() -> String {
var out = FfiString()
ensureOk({{ prefix }}_last_error_message(&out))
defer { {{ prefix }}_free_string(out) }
return stringFromFfi(out)
}
@inline(__always)
private func checkStatus(_ status: FfiStatus, context: StaticString = #function) throws {
guard status.code == 0 else {
throw FfiError(message: "FFI failed in \(context) with code \(status.code)")
}
}
@inline(__always)
private func ensureOk(_ status: FfiStatus, context: StaticString = #function) {
guard status.code == 0 else {
fatalError("FFI failed in \(context) [\(status.code)]")
}
}
{%- if has_async %}
final class FfiFutureState<T>: @unchecked Sendable {
typealias Continuation = CheckedContinuation<T, Error>
enum FinishDecision {
case alreadyFinished
case finishWithoutContinuation
case finishWithContinuation(Continuation)
}
final class ContinuationBox {
let continuation: Continuation
init(_ continuation: Continuation) { self.continuation = continuation }
}
let handle: RustFutureHandle?
private var continuationSlot: UInt64 = 0
init(handle: RustFutureHandle?) {
self.handle = handle
}
func installContinuation(_ continuation: Continuation) -> Bool {
let box = ContinuationBox(continuation)
let raw = UInt64(UInt(bitPattern: Unmanaged.passRetained(box).toOpaque()))
let prior = withUnsafeMutablePointer(to: &continuationSlot) { {{ prefix }}_atomic_u64_exchange($0, raw) }
if prior == 0 { return true }
if prior == 1 {
Unmanaged.passUnretained(box).release()
return false
}
withUnsafeMutablePointer(to: &continuationSlot) { _ = {{ prefix }}_atomic_u64_exchange($0, prior) }
Unmanaged.passUnretained(box).release()
return false
}
@inline(__always)
func canPoll() -> Bool {
withUnsafeMutablePointer(to: &continuationSlot) { {{ prefix }}_atomic_u64_load($0) } != 1
}
func decideFinish() -> FinishDecision {
let prior = withUnsafeMutablePointer(to: &continuationSlot) { {{ prefix }}_atomic_u64_exchange($0, 1) }
if prior == 1 { return .alreadyFinished }
if prior == 0 { return .finishWithoutContinuation }
let box = Unmanaged<ContinuationBox>.fromOpaque(UnsafeRawPointer(bitPattern: UInt(prior))!).takeRetainedValue()
return .finishWithContinuation(box.continuation)
}
}
{%- endif %}
@usableFromInline protocol WireCodable {
static func decode(from reader: inout WireReader) -> Self
func encode(to writer: inout WireWriter)
}
@usableFromInline struct WireReader {
@usableFromInline let data: Data
@usableFromInline var position: Int
@inlinable init(data: Data, position: Int = 0) { self.data = data; self.position = position }
@inlinable init(ptr: UnsafePointer<UInt8>, len: Int) { self.data = Data(bytes: ptr, count: len); self.position = 0 }
@inlinable var remaining: Int { data.count - position }
@inlinable var bytesRead: Int { position }
@inlinable mutating func readU8() -> UInt8 { let v = data[position]; position += 1; return v }
@inlinable mutating func readI8() -> Int8 { Int8(bitPattern: readU8()) }
@inlinable mutating func readU16() -> UInt16 { let v = data.withUnsafeBytes { $0.loadUnaligned(fromByteOffset: position, as: UInt16.self) }; position += 2; return v }
@inlinable mutating func readI16() -> Int16 { Int16(bitPattern: readU16()) }
@inlinable mutating func readU32() -> UInt32 { let v = data.withUnsafeBytes { $0.loadUnaligned(fromByteOffset: position, as: UInt32.self) }; position += 4; return v }
@inlinable mutating func readI32() -> Int32 { Int32(bitPattern: readU32()) }
@inlinable mutating func readU64() -> UInt64 { let v = data.withUnsafeBytes { $0.loadUnaligned(fromByteOffset: position, as: UInt64.self) }; position += 8; return v }
@inlinable mutating func readI64() -> Int64 { Int64(bitPattern: readU64()) }
@inlinable mutating func readF32() -> Float { Float(bitPattern: readU32()) }
@inlinable mutating func readF64() -> Double { Double(bitPattern: readU64()) }
@inlinable mutating func readBool() -> Bool { readU8() != 0 }
@inlinable mutating func readString() -> String {
let len = Int(readU32())
guard len > 0 else { return "" }
let start = position
position += len
return String(decoding: data[start..<(start + len)], as: UTF8.self)
}
@inlinable mutating func readBytes() -> Data {
let len = Int(readU32())
guard len > 0 else { return Data() }
let start = position
position += len
return data[start..<(start + len)]
}
@inlinable mutating func readDuration() -> TimeInterval {
let seconds = readU64()
let nanoseconds = readU32()
return Double(seconds) + (Double(nanoseconds) / 1.0e9)
}
@inlinable mutating func readTimestamp() -> Date {
let seconds = readI64()
let nanoseconds = readU32()
let delta = seconds >= 0
? (Double(seconds) + (Double(nanoseconds) / 1.0e9))
: (Double(seconds) - (Double(nanoseconds) / 1.0e9))
return Date(timeIntervalSince1970: delta)
}
@inlinable mutating func readUuid() -> UUID {
let hi = readU64()
let lo = readU64()
var uuid: uuid_t = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
var hiBe = hi.bigEndian
var loBe = lo.bigEndian
withUnsafeMutableBytes(of: &uuid) { uuidBytes in
Swift.withUnsafeBytes(of: &hiBe) { uuidBytes[0..<8].copyBytes(from: $0) }
Swift.withUnsafeBytes(of: &loBe) { uuidBytes[8..<16].copyBytes(from: $0) }
}
return UUID(uuid: uuid)
}
@inlinable mutating func readUrl() -> URL {
guard let url = URL(string: readString()) else { fatalError("Invalid URL") }
return url
}
@inlinable mutating func readOptional<T>(_ body: (inout WireReader) -> T) -> T? {
guard readU8() != 0 else { return nil }
return body(&self)
}
@inlinable mutating func readArray<T>(_ body: (inout WireReader) -> T) -> [T] {
let count = Int(readU32())
guard count > 0 else { return [] }
var result = [T]()
result.reserveCapacity(count)
for _ in 0..<count { result.append(body(&self)) }
return result
}
@inlinable mutating func readBlittable<T>() -> T {
let v = data.withUnsafeBytes { $0.loadUnaligned(fromByteOffset: position, as: T.self) }
position += MemoryLayout<T>.stride
return v
}
@inlinable mutating func readBlittableArray<T>() -> [T] {
let count = Int(readU32())
guard count > 0 else { return [] }
let byteCount = count * MemoryLayout<T>.stride
let start = position
position += byteCount
return data.withUnsafeBytes { rawBuffer in
Array<T>(unsafeUninitializedCapacity: count) { buffer, initialized in
memcpy(buffer.baseAddress!, rawBuffer.baseAddress!.advanced(by: start), byteCount)
initialized = count
}
}
}
@inlinable mutating func read<T: WireCodable>() -> T {
T.decode(from: &self)
}
}
@usableFromInline struct WireWriter {
@usableFromInline var data: Data
@inlinable init(capacity: Int = 64) { self.data = Data(capacity: capacity) }
@inlinable mutating func writeU8(_ v: UInt8) { data.append(v) }
@inlinable mutating func writeI8(_ v: Int8) { writeU8(UInt8(bitPattern: v)) }
@inlinable mutating func writeU16(_ v: UInt16) { var val = v; Swift.withUnsafeBytes(of: &val) { data.append(contentsOf: $0) } }
@inlinable mutating func writeI16(_ v: Int16) { writeU16(UInt16(bitPattern: v)) }
@inlinable mutating func writeU32(_ v: UInt32) { var val = v; Swift.withUnsafeBytes(of: &val) { data.append(contentsOf: $0) } }
@inlinable mutating func writeI32(_ v: Int32) { writeU32(UInt32(bitPattern: v)) }
@inlinable mutating func writeU64(_ v: UInt64) { var val = v; Swift.withUnsafeBytes(of: &val) { data.append(contentsOf: $0) } }
@inlinable mutating func writeI64(_ v: Int64) { writeU64(UInt64(bitPattern: v)) }
@inlinable mutating func writeF32(_ v: Float) { writeU32(v.bitPattern) }
@inlinable mutating func writeF64(_ v: Double) { writeU64(v.bitPattern) }
@inlinable mutating func writeBool(_ v: Bool) { writeU8(v ? 1 : 0) }
@inlinable mutating func writeString(_ v: String) {
var s = v
s.withUTF8 { utf8 in
writeU32(UInt32(utf8.count))
data.append(contentsOf: utf8)
}
}
@inlinable mutating func writeBytes(_ v: Data) {
writeU32(UInt32(v.count))
data.append(contentsOf: v)
}
@inlinable mutating func writeDuration(_ v: TimeInterval) {
if v.rounded(.down) > Double(Int64.max) { fatalError("Duration overflow") }
if v < 0 { fatalError("Invalid duration") }
let seconds = UInt64(v)
let nanoseconds = UInt32((v - Double(seconds)) * 1.0e9)
writeU64(seconds)
writeU32(nanoseconds)
}
@inlinable mutating func writeTimestamp(_ v: Date) {
var delta = v.timeIntervalSince1970
var sign: Int64 = 1
if delta < 0 { sign = -1; delta = -delta }
if delta.rounded(.down) > Double(Int64.max) { fatalError("Timestamp overflow") }
let seconds = Int64(delta)
let nanoseconds = UInt32((delta - Double(seconds)) * 1.0e9)
writeI64(sign * seconds)
writeU32(nanoseconds)
}
@inlinable mutating func writeUuid(_ v: UUID) {
var uuid = v.uuid
let (hi, lo) = Swift.withUnsafeBytes(of: &uuid) { raw -> (UInt64, UInt64) in
let hiBe = raw.loadUnaligned(fromByteOffset: 0, as: UInt64.self)
let loBe = raw.loadUnaligned(fromByteOffset: 8, as: UInt64.self)
return (UInt64(bigEndian: hiBe), UInt64(bigEndian: loBe))
}
writeU64(hi)
writeU64(lo)
}
@inlinable mutating func writeUrl(_ v: URL) { writeString(v.absoluteString) }
@inlinable mutating func writeOptional<T>(_ v: T?, _ body: (inout WireWriter, T) -> Void) {
guard let value = v else { writeU8(0); return }
writeU8(1)
body(&self, value)
}
@inlinable mutating func writeArray<T>(_ v: [T], _ body: (inout WireWriter, T) -> Void) {
writeU32(UInt32(v.count))
for item in v { body(&self, item) }
}
@inlinable mutating func writeBlittable<T>(_ value: T) {
var copy = value
let size = MemoryLayout<T>.size
let stride = MemoryLayout<T>.stride
Swift.withUnsafeBytes(of: ©) { data.append(contentsOf: $0) }
if stride > size {
data.append(contentsOf: repeatElement(UInt8(0), count: stride - size))
}
}
@inlinable mutating func writeBlittableArray<T>(_ v: [T]) {
writeU32(UInt32(v.count))
v.withUnsafeBytes { data.append(contentsOf: $0) }
}
@inlinable mutating func write<T: WireCodable>(_ v: T) { v.encode(to: &self) }
@inlinable func finalize() -> Data { data }
}
@inlinable func boltffiInvokeWireCallback(
_ callback: (@convention(c) (UInt64, UnsafePointer<UInt8>?, UInt, FfiStatus) -> Void)?,
_ callbackData: UInt64,
_ encoded: Data,
_ status: FfiStatus
) {
encoded.withUnsafeBytes { buf in
callback?(callbackData, buf.baseAddress?.assumingMemoryBound(to: UInt8.self), UInt(buf.count), status)
}
}
@inlinable func boltffiDecodeOwnedBuf<T>(_ ptr: UnsafePointer<UInt8>?, _ len: Int, _ decode: (inout WireReader) -> T) -> T {
var reader = WireReader(ptr: ptr!, len: len)
return decode(&reader)
}
@inlinable func boltffiDecodeOwnedBuf<T>(_ ptr: UnsafePointer<UInt8>?, _ len: Int, _ decode: (inout WireReader) throws -> T) throws -> T {
var reader = WireReader(ptr: ptr!, len: len)
return try decode(&reader)
}
@inlinable func boltffiEncode(_ body: (inout WireWriter) -> Void) -> [UInt8] {
var writer = WireWriter()
body(&writer)
return [UInt8](writer.finalize())
}
{%- if has_streams %}
private enum StreamPollResult: Int8 {
case ready = 0
case closed = 1
}
final class WireStreamContextBase: @unchecked Sendable {
let subscription: SubscriptionHandle
let batchSize: UInt
let popBatch: (SubscriptionHandle, UInt) -> FfiBuf_u8
let poll: (SubscriptionHandle?, UInt64, StreamContinuationCallback?) -> Void
let unsubscribe: (SubscriptionHandle?) -> Void
let freeFn: (SubscriptionHandle?) -> Void
let freeBuf: (FfiBuf_u8) -> Void
let atomicCas: (UnsafeMutablePointer<UInt8>?, UInt8, UInt8) -> Bool
let processItems: (inout WireReader) -> Void
let finish: () -> Void
private var lifecycleTag: UInt8 = 0
private var callbackTag: UInt8 = 0
init(
subscription: SubscriptionHandle, batchSize: UInt,
popBatch: @escaping (SubscriptionHandle, UInt) -> FfiBuf_u8,
poll: @escaping (SubscriptionHandle?, UInt64, StreamContinuationCallback?) -> Void,
unsubscribe: @escaping (SubscriptionHandle?) -> Void,
freeFn: @escaping (SubscriptionHandle?) -> Void,
freeBuf: @escaping (FfiBuf_u8) -> Void,
atomicCas: @escaping (UnsafeMutablePointer<UInt8>?, UInt8, UInt8) -> Bool,
processItems: @escaping (inout WireReader) -> Void,
finish: @escaping () -> Void
) {
self.subscription = subscription; self.batchSize = batchSize
self.popBatch = popBatch; self.poll = poll; self.unsubscribe = unsubscribe
self.freeFn = freeFn; self.freeBuf = freeBuf; self.atomicCas = atomicCas
self.processItems = processItems; self.finish = finish
}
func start() { registerPoll() }
func requestTermination() {
let started = withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 0, 1) }
if started { unsubscribe(subscription); _ = withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 1, 2) } }
attemptFinalize()
}
private func attemptFinalize() {
guard (withUnsafeMutablePointer(to: &callbackTag) { atomicCas($0, 0, 0) }) else { return }
guard (withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 2, 3) }) else { return }
freeFn(subscription); finish()
}
private func schedulePoll() { Task { [self] in await Task.yield(); registerPoll() } }
private func registerPoll() {
guard (withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 0, 0) }) else { attemptFinalize(); return }
let data = UInt64(UInt(bitPattern: Unmanaged.passRetained(self).toOpaque()))
poll(subscription, data) { data, result in
Unmanaged<WireStreamContextBase>.fromOpaque(UnsafeRawPointer(bitPattern: UInt(data))!).takeRetainedValue().handlePoll(result)
}
}
private func handlePoll(_ pollResult: Int8) {
let isClosed = pollResult == StreamPollResult.closed.rawValue
guard (withUnsafeMutablePointer(to: &callbackTag) { atomicCas($0, 0, 1) }) else { attemptFinalize(); return }
defer { _ = withUnsafeMutablePointer(to: &callbackTag) { atomicCas($0, 1, 0) }; attemptFinalize() }
guard (withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 0, 0) }) else { return }
while true {
let buf = popBatch(subscription, batchSize)
guard buf.len > 0, let ptr = buf.ptr else { freeBuf(buf); break }
var reader = WireReader(ptr: ptr, len: Int(buf.len))
freeBuf(buf)
processItems(&reader)
}
if isClosed { requestTermination(); return }
guard (withUnsafeMutablePointer(to: &lifecycleTag) { atomicCas($0, 0, 0) }) else { return }
schedulePoll()
}
}
{%- endif %}