mobiler 0.31.0

Build mobile apps in Rust — one core, native UI on Android, iOS, and the web (CLI)
import Foundation
import SharedTypes

// WebSocket (free, bundled). A persistent connection backed by URLSessionWebSocketTask (system
// framework — no package). Canonical usage rides the streaming primitive:
//   cx.subscribe("ws", "websocket", "stream", url, |r| Msg::Frame(r))  // opens + streams frames
//   cx.plugin("websocket", "send", "hello", ...)                       // send while subscribed
//   cx.unsubscribe("ws")                                               // closes the socket
// `stream` (op) opens the socket and emits a PluginResponse per incoming frame until the
// subscription is cancelled (ok:false, "closed" on close). The legacy request/response ops
// (connect/send/recv/close) remain for back-compat. The Plugins registry dispatches `subscribe`
// (streaming) + `handle` (request/response) to a shared actor so the socket survives across calls.
enum WebSocketPlugin {
    static func handle(op: String, input: String) async -> PluginResponse {
        await WebSocketConnection.shared.handle(op: op, input: input)
    }

    // Streaming entrypoint (cx.subscribe): `input` is the ws:// / wss:// URL. Emits each incoming
    // frame; runs until the subscription's Task is cancelled (cx.unsubscribe).
    static func subscribe(op: String, input: String, emit: @escaping @Sendable (PluginResponse) -> Void) async {
        await WebSocketConnection.shared.stream(url: input, emit: emit)
    }
}

// Holds the single connection. @MainActor-isolated for safe mutable state across the async ops.
@MainActor
private final class WebSocketConnection {
    static let shared = WebSocketConnection()
    private var task: URLSessionWebSocketTask?

    func handle(op: String, input: String) async -> PluginResponse {
        switch op {
        case "connect": return connect(input)
        case "send": return await send(input)
        case "recv": return await recv()
        case "close":
            task?.cancel(with: .goingAway, reason: nil)
            task = nil
            return PluginResponse(ok: true, output: "")
        default:
            return PluginResponse(ok: false, output: "unknown op '\(op)'")
        }
    }

    // Open the socket and emit one PluginResponse per incoming frame until the enclosing Task is
    // cancelled (unsubscribe). The cancellation handler cancels the URLSession task, which makes the
    // in-flight `receive()` throw and ends the loop. `send` works against the same `task` meanwhile.
    func stream(url urlString: String, emit: @Sendable (PluginResponse) -> Void) async {
        guard let url = URL(string: urlString) else {
            emit(PluginResponse(ok: false, output: "invalid url"))
            return
        }
        // A dedicated session (not URLSession.shared) for the socket's lifetime.
        let session = URLSession(configuration: .default)
        let t = session.webSocketTask(with: url)
        task = t
        t.resume()
        await withTaskCancellationHandler {
            while !Task.isCancelled {
                do {
                    switch try await t.receive() {
                    case .string(let s): emit(PluginResponse(ok: true, output: s))
                    case .data(let d): emit(PluginResponse(ok: true, output: String(decoding: d, as: UTF8.self)))
                    // A control frame (ping/pong/etc.) — keep waiting, don't emit.
                    @unknown default: continue
                    }
                } catch {
                    // The socket closed or failed: report the real reason and STOP — `return`
                    // exits this loop unambiguously (no re-loop). Suppress on cancellation so an
                    // unsubscribe doesn't surface a spurious error.
                    if !Task.isCancelled {
                        emit(PluginResponse(ok: false, output: error.localizedDescription))
                    }
                    return
                }
            }
        } onCancel: {
            t.cancel(with: .goingAway, reason: nil)
        }
        session.invalidateAndCancel()
        if task === t { task = nil }
    }

    private func connect(_ urlString: String) -> PluginResponse {
        guard let url = URL(string: urlString) else {
            return PluginResponse(ok: false, output: "invalid url")
        }
        let t = URLSession.shared.webSocketTask(with: url)
        task = t
        t.resume()  // URLSessionWebSocketTask connects lazily; the first send/receive drives it.
        return PluginResponse(ok: true, output: "")
    }

    private func send(_ text: String) async -> PluginResponse {
        guard let task else { return PluginResponse(ok: false, output: "not connected") }
        do {
            try await task.send(.string(text))
            return PluginResponse(ok: true, output: "")
        } catch {
            return PluginResponse(ok: false, output: error.localizedDescription)
        }
    }

    private func recv() async -> PluginResponse {
        guard let task else { return PluginResponse(ok: false, output: "closed") }
        do {
            switch try await task.receive() {
            case .string(let s): return PluginResponse(ok: true, output: s)
            case .data(let d): return PluginResponse(ok: true, output: String(decoding: d, as: UTF8.self))
            @unknown default: return PluginResponse(ok: true, output: "")
            }
        } catch {
            // A receive error means the socket closed/failed → tell the app to stop its recv-loop.
            self.task = nil
            return PluginResponse(ok: false, output: "closed")
        }
    }
}