use crate::error::{NexarError, Result};
use crate::protocol::NexarMessage;
use crate::rpc::registry::RpcRegistry;
use crate::transport::PeerConnection;
use crate::types::Priority;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
pub struct RpcDispatcher {
registry: Arc<RwLock<RpcRegistry>>,
}
impl RpcDispatcher {
pub fn new(registry: Arc<RwLock<RpcRegistry>>) -> Self {
Self { registry }
}
pub async fn dispatch(&self, fn_id: u16, payload: &[u8]) -> Result<Vec<u8>> {
let reg = self.registry.read().await;
let handler = reg
.get(fn_id)
.ok_or(NexarError::RpcNotRegistered { fn_id })?;
let response = handler(payload);
Ok(response)
}
pub async fn handle_request(
&self,
peer: &PeerConnection,
req_id: u64,
fn_id: u16,
payload: &[u8],
) -> Result<()> {
let response_payload = self.dispatch(fn_id, payload).await?;
let response = NexarMessage::RpcResponse {
req_id,
payload: response_payload,
};
peer.send_message(&response, Priority::Realtime).await
}
pub async fn serve(
&self,
peer: &PeerConnection,
incoming: &mut mpsc::Receiver<NexarMessage>,
) -> Result<()> {
while let Some(msg) = incoming.recv().await {
if let NexarMessage::Rpc {
req_id,
fn_id,
payload,
} = msg
{
self.handle_request(peer, req_id, fn_id, &payload).await?;
}
}
Ok(())
}
}