reasoninglayer 1.0.3

Rust client SDK for the Reasoning Layer API
Documentation
//! WebSocket support with automatic reconnection.
//!
//! The Reasoning Layer backend exposes three WebSocket endpoints for real-time streams:
//!
//! - **Cognitive agent events** — `GET /api/v1/cognitive/agents/ws`
//! - **Bidirectional cognitive cycle** — `GET /ws/cognitive/{agent_id}`
//! - **Oversight session stream** — `GET /ws/oversight/{session_id}`
//!
//! Build URLs via the helper methods on [`CognitiveClient`](crate::CognitiveClient) and
//! [`OversightClient`](crate::OversightClient), then connect with [`WebSocketConnection::connect`].
//!
//! # Reconnection policy
//!
//! On disconnect, [`WebSocketConnection::next_message`] will transparently attempt to reconnect
//! with exponential backoff (1s → 2s → 4s → … capped at 30s) for up to 10 attempts. Each reconnect
//! attempt fires the `on_reconnect` callback if set.
//!
//! # Example
//!
//! ```no_run
//! # async fn run() -> Result<(), reasoninglayer::Error> {
//! use reasoninglayer::{AuthConfig, ClientConfig, ReasoningLayerClient};
//! use reasoninglayer::ws::WebSocketConnection;
//!
//! let client = ReasoningLayerClient::new(
//!     ClientConfig::new(
//!         "https://platform.ovh.reasoninglayer.ai",
//!         "tenant-uuid",
//!         AuthConfig::Cookie,
//!     ),
//! )?;
//!
//! let url = client.cognitive().agent_events_url("agent-uuid")?;
//! let mut ws = WebSocketConnection::connect(&url).await?;
//! while let Some(event) = ws.next_message().await? {
//!     println!("{event:?}");
//! }
//! # Ok(()) }
//! ```

use std::time::Duration;

use futures_util::{SinkExt, StreamExt};
use serde_json::Value;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};

use crate::error::Error;

const MAX_RECONNECT_ATTEMPTS: u32 = 10;
const BASE_RECONNECT_DELAY: Duration = Duration::from_secs(1);
const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30);

type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

/// A WebSocket connection that transparently reconnects with exponential backoff.
pub struct WebSocketConnection {
    url: String,
    stream: Option<WsStream>,
    on_reconnect: Option<Box<dyn FnMut(u32) + Send>>,
}

impl std::fmt::Debug for WebSocketConnection {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("WebSocketConnection")
            .field("url", &self.url)
            .field("connected", &self.stream.is_some())
            .finish()
    }
}

impl WebSocketConnection {
    /// Open a new WebSocket connection to `url`. Use the URL builders on
    /// [`CognitiveClient`](crate::CognitiveClient) and [`OversightClient`](crate::OversightClient)
    /// to construct `url`.
    pub async fn connect(url: &str) -> Result<Self, Error> {
        let stream = open(url).await?;
        Ok(Self {
            url: url.to_string(),
            stream: Some(stream),
            on_reconnect: None,
        })
    }

    /// Register a callback fired on each reconnection attempt. The callback receives the attempt
    /// number (1-indexed).
    pub fn on_reconnect<F>(&mut self, callback: F)
    where
        F: FnMut(u32) + Send + 'static,
    {
        self.on_reconnect = Some(Box::new(callback));
    }

    /// Wait for the next JSON message. On transient disconnect, this method will reconnect with
    /// exponential backoff (up to 10 attempts) and return the first message on the new connection.
    ///
    /// Returns `Ok(None)` if the server closes the connection cleanly and reconnection fails after
    /// all retries.
    pub async fn next_message(&mut self) -> Result<Option<Value>, Error> {
        loop {
            let stream = match self.stream.as_mut() {
                Some(s) => s,
                None => match self.reconnect().await? {
                    Some(s) => s,
                    None => return Ok(None),
                },
            };

            match stream.next().await {
                Some(Ok(Message::Text(text))) => {
                    return Ok(Some(serde_json::from_str(&text)?));
                }
                Some(Ok(Message::Binary(bytes))) => {
                    return Ok(Some(serde_json::from_slice(&bytes)?));
                }
                Some(Ok(Message::Ping(payload))) => {
                    let _ = stream.send(Message::Pong(payload)).await;
                    continue;
                }
                Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => continue,
                Some(Ok(Message::Close(_))) | None => {
                    self.stream = None;
                    continue;
                }
                Some(Err(err)) => {
                    // Transient failure — attempt reconnect.
                    self.stream = None;
                    let _ = err;
                    continue;
                }
            }
        }
    }

    /// Send a JSON message to the server.
    pub async fn send(&mut self, value: &Value) -> Result<(), Error> {
        let Some(stream) = self.stream.as_mut() else {
            return Err(Error::Network {
                message: "websocket is not connected".into(),
                source: None,
            });
        };
        let text = serde_json::to_string(value)?;
        stream
            .send(Message::Text(text))
            .await
            .map_err(|e| Error::Network {
                message: e.to_string(),
                source: Some(Box::new(e)),
            })?;
        Ok(())
    }

    /// Close the connection.
    pub async fn close(mut self) -> Result<(), Error> {
        if let Some(mut stream) = self.stream.take() {
            let _ = stream.close(None).await;
        }
        Ok(())
    }

    async fn reconnect(&mut self) -> Result<Option<&mut WsStream>, Error> {
        for attempt in 1..=MAX_RECONNECT_ATTEMPTS {
            if let Some(cb) = self.on_reconnect.as_mut() {
                cb(attempt);
            }
            let delay = backoff(attempt);
            tokio::time::sleep(delay).await;
            match open(&self.url).await {
                Ok(stream) => {
                    self.stream = Some(stream);
                    return Ok(self.stream.as_mut());
                }
                Err(_) if attempt < MAX_RECONNECT_ATTEMPTS => continue,
                Err(_) => return Ok(None),
            }
        }
        Ok(None)
    }
}

fn backoff(attempt: u32) -> Duration {
    let shift = (attempt.saturating_sub(1)).min(16);
    let millis = BASE_RECONNECT_DELAY.as_millis() as u64 * (1u64 << shift);
    Duration::from_millis(millis.min(MAX_RECONNECT_DELAY.as_millis() as u64))
}

async fn open(url: &str) -> Result<WsStream, Error> {
    let (stream, _response) =
        tokio_tungstenite::connect_async(url)
            .await
            .map_err(|e| Error::Network {
                message: format!("websocket connect failed: {e}"),
                source: Some(Box::new(e)),
            })?;
    Ok(stream)
}