use anyhow::anyhow;
use tokio::sync::mpsc;
use tracing::{debug, error, trace};
use crate::protocol::rpc;
pub struct ResponseBuffer {
buffer: Vec<u8>,
has_content: bool,
}
impl ResponseBuffer {
pub fn with_capacity(capacity: usize) -> Self {
Self { buffer: Vec::with_capacity(capacity), has_content: false }
}
pub fn get_mut_buffer(&mut self) -> &mut Vec<u8> {
&mut self.buffer
}
pub fn mark_has_content(&mut self) {
self.has_content = true;
}
pub fn has_content(&self) -> bool {
self.has_content
}
pub fn into_inner(self) -> Vec<u8> {
self.buffer
}
pub fn clear(&mut self) {
self.buffer.clear();
self.has_content = false;
}
}
#[derive(Debug)]
pub struct RpcCommand {
pub data: Vec<u8>,
pub context: rpc::Context,
}
pub type CommandResult = Result<Option<ResponseBuffer>, anyhow::Error>;
pub type AsyncCommandProcessor = for<'a> fn(
data: &[u8],
output: &'a mut ResponseBuffer,
context: rpc::Context,
)
-> futures::future::BoxFuture<'a, anyhow::Result<bool>>;
#[derive(Debug, Clone)]
pub struct CommandQueue {
command_sender: mpsc::UnboundedSender<RpcCommand>,
}
impl CommandQueue {
pub fn new(
processor: AsyncCommandProcessor,
result_sender: mpsc::UnboundedSender<CommandResult>,
buffer_capacity: usize,
) -> Self {
let (command_sender, mut command_receiver) = mpsc::unbounded_channel::<RpcCommand>();
tokio::spawn(async move {
let mut output_buffer = ResponseBuffer::with_capacity(buffer_capacity);
while let Some(command) = command_receiver.recv().await {
trace!("Processing command from queue");
output_buffer.clear();
let result =
match processor(&command.data, &mut output_buffer, command.context).await {
Ok(true) => {
output_buffer.mark_has_content();
let buffer_to_send = std::mem::replace(
&mut output_buffer,
ResponseBuffer::with_capacity(buffer_capacity),
);
Ok(Some(buffer_to_send))
}
Ok(false) => {
Ok(None)
}
Err(e) => Err(e),
};
if let Err(e) = result_sender.send(result) {
error!("Failed to send command processing result: {:?}", e);
break;
}
}
debug!("Command queue handler finished");
});
Self { command_sender }
}
pub fn submit_command(
&self,
data: Vec<u8>,
context: rpc::Context,
) -> Result<(), anyhow::Error> {
self.command_sender
.send(RpcCommand { data, context })
.map_err(|e| anyhow!("Failed to send command: {}", e))
}
}