ovsdb-client 0.0.1

Async Rust client for the Open vSwitch Database Protocol with monitoring support
Documentation
mod codec;
pub mod ipc;
pub mod tcp;

use bytes::BytesMut;
use futures_util::{Sink, SinkExt, Stream, stream::StreamExt};
use jsonrpsee::core::{
    async_trait,
    client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use serde_json::{Value, json};
use thiserror::Error;

#[derive(Debug, Error)]
enum TransportError {
    #[error("Connection closed.")]
    ConnectionClosed,

    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    #[error("Unkown error: {0}")]
    Unknown(String),
}

struct Sender<T: Send + Sink<BytesMut>> {
    inner: T,
}

#[async_trait]
impl<T: Send + Sink<BytesMut, Error = impl std::error::Error> + Unpin + 'static> TransportSenderT
    for Sender<T>
{
    type Error = TransportError;

    async fn send(&mut self, body: String) -> Result<(), Self::Error> {
        let mut message: Value =
            serde_json::from_str(&body).map_err(|e| TransportError::Unknown(e.to_string()))?;

        // NOTE(mnaser): In order to be able to use the subscription client, we need to
        //               drop the subscription message for the "update" method, as the
        //               remote doesn't support JSON-RPC 2.0.
        if message["method"] == json!("update") {
            return Ok(());
        }

        // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
        //               support, so we intercept the message, remove "jsonrpc" and then
        //               send the message.
        message.as_object_mut().map(|obj| obj.remove("jsonrpc"));

        // NOTE(mnaser): OVSDB expects all requests to have a "params" key, so we add an
        //               empty array if it doesn't exist.
        if !message.as_object().unwrap().contains_key("params") {
            message["params"] = json!([]);
        }

        self.inner
            .send(BytesMut::from(message.to_string().as_str()))
            .await
            .map_err(|e| TransportError::Unknown(e.to_string()))?;

        Ok(())
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        self.inner
            .close()
            .await
            .map_err(|e| TransportError::Unknown(e.to_string()))?;

        Ok(())
    }
}

struct Receiver<T: Send + Stream> {
    inner: T,
}

#[async_trait]
impl<T: Send + Stream<Item = Result<Value, std::io::Error>> + Unpin + 'static> TransportReceiverT
    for Receiver<T>
{
    type Error = TransportError;

    async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
        match self.inner.next().await {
            None => Err(TransportError::ConnectionClosed),
            Some(Ok(mut message)) => {
                // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
                //               support, so we intercept the message, add "jsonrpc" and then
                //               send the message.
                message
                    .as_object_mut()
                    .map(|obj| obj.insert("jsonrpc".to_string(), json!("2.0")));

                // NOTE(mnaser): jsonrpsee expects no error field if there is a result, due to the
                //               remote not supporting JSON-RPC 2.0, we need to remove the "error"
                //               field if there is a "result" field.
                if message.as_object().unwrap().contains_key("result") {
                    message.as_object_mut().map(|obj| obj.remove("error"));
                }

                // NOTE(mnaser): If a message comes in with it's "id" field set to null, then
                //               we remove it.
                if message.as_object().unwrap().contains_key("id") && message["id"] == json!(null) {
                    message.as_object_mut().map(|obj| obj.remove("id"));
                }

                Ok(ReceivedMessage::Bytes(message.to_string().into_bytes()))
            }
            Some(Err(e)) => Err(TransportError::Io(e)),
        }
    }
}