ogurpchik 0.1.0

A transport-agnostic RPC framework for stream and memory-based communication. Built with high-performance primitives to deliver medium-performance results.
use crate::align_buffer::AlignedBuffer;
use crate::message_protocol::{Message, MessageProtocol};
use crate::tpc_pool::TpcPool;
use crate::transport::raw::{IncomingMsg, MsgBatch, RawTransport, SendHandle};
use crate::ServiceHandler;
use compio::buf::IoBuf;
use std::collections::HashMap;
use std::rc::Rc;

use dashmap::DashMap;
use std::sync::Arc;
use tracing::error;
pub async fn session_loop<P, H>(
    incoming_rx: flume::Receiver<AlignedBuffer>,
    peer: SendHandle,
    pending: Rc<DashMap<u64, oneshot::Sender<AlignedBuffer>>>,
    handler: H,
) where
    P: MessageProtocol,
    H: ServiceHandler<P>,
{
    while let Ok(first_raw) = incoming_rx.recv_async().await {
        let mut in_batch = MsgBatch::new();
        in_batch.push(first_raw);

        while in_batch.len() < in_batch.capacity() {
            if let Ok(next_raw) = incoming_rx.try_recv() {
                in_batch.push(next_raw);
            } else {
                break;
            }
        }

        let handler = handler.clone();
        let mut pending = pending.clone();
        let peer = peer.clone();
        compio::runtime::spawn(async move {
            let mut out_batch = MsgBatch::new();

            for raw in in_batch {
                match P::decode(&raw.0) {
                    Ok(Message::Request { id, payload }) => {
                        if let Ok(resp) = handler.on_request(payload).await {
                            let out_buf = TpcPool::acquire_body(0);
                            if let Ok(final_buf) =
                                P::encode(Message::Response { id, payload: resp }, out_buf)
                            {
                                out_batch.push(final_buf);
                            }
                        }
                        TpcPool::release_body(raw);
                    }
                    Ok(Message::Push { payload }) => {
                        let _ = handler.on_request(payload).await;
                        TpcPool::release_body(raw);
                    }
                    Ok(Message::Response { id, .. }) => {
                        if let Some((_, tx)) = pending.remove(&id) {
                            let _ = tx.send(raw);
                        } else {
                            TpcPool::release_body(raw);
                        }
                    }
                    Err(e) => {
                        error!("Protocol decode error: {e}");
                        TpcPool::release_body(raw);
                    }
                }
            }

            if !out_batch.is_empty() {
                let _ = peer.send_batch(out_batch).await;
            }
        })
        .detach();
    }
}

pub async fn handle_connection<P, H, T>(transport: T, handler: H) -> anyhow::Result<()>
where
    P: MessageProtocol,
    T: RawTransport,
    H: ServiceHandler<P>,
{
    let (peer_handle, rx) = transport.decompose()?;

    let pending = Rc::new(DashMap::new());

    session_loop::<P, H>(rx, peer_handle, pending, handler).await;

    Ok(())
}