use super::protocol::{parse_message, parse_request, serialize_reply};
use super::types::{ParseResult, ServerEvent, Transmit};
use crate::{Reply, Result};
use std::collections::VecDeque;
#[derive(Debug, Clone, PartialEq)]
pub enum ServerState {
Receiving,
Processing,
Upgraded {
interface: String,
},
Error {
message: String,
},
}
#[derive(Debug)]
pub struct Server {
state: ServerState,
send_buf: VecDeque<Transmit>,
recv_buf: Vec<u8>,
pending_events: VecDeque<ServerEvent>,
}
impl Server {
pub fn new() -> Self {
Self {
state: ServerState::Receiving,
send_buf: VecDeque::new(),
recv_buf: Vec::new(),
pending_events: VecDeque::new(),
}
}
pub fn handle_input(&mut self, data: &[u8]) -> Result<()> {
self.recv_buf.extend_from_slice(data);
loop {
match parse_message(&self.recv_buf) {
ParseResult::Complete { message, consumed } => {
self.recv_buf.drain(..consumed);
let request = parse_request(&message)?;
if request.upgrade.unwrap_or(false) {
let interface = request
.method
.rsplit_once('.')
.map(|x| x.0)
.unwrap_or(&request.method)
.to_string();
self.pending_events.push_back(ServerEvent::Upgrade {
interface: interface.clone(),
});
self.state = ServerState::Upgraded { interface };
} else {
self.pending_events
.push_back(ServerEvent::Request { request });
self.state = ServerState::Processing;
}
}
ParseResult::Incomplete { .. } => {
break;
}
ParseResult::Invalid { error } => {
self.state = ServerState::Error {
message: error.clone(),
};
return Err(crate::context!(crate::ErrorKind::InvalidParameter(error)));
}
}
}
Ok(())
}
pub fn send_reply(&mut self, reply: Reply) -> Result<()> {
let payload = serialize_reply(&reply)?;
self.send_buf.push_back(Transmit::new(payload));
if !reply.continues.unwrap_or(false) && self.state == ServerState::Processing {
self.state = ServerState::Receiving;
}
Ok(())
}
pub fn poll_transmit(&mut self) -> Option<Transmit> {
self.send_buf.pop_front()
}
pub fn poll_event(&mut self) -> Option<ServerEvent> {
self.pending_events.pop_front()
}
pub fn state(&self) -> &ServerState {
&self.state
}
pub fn is_receiving(&self) -> bool {
matches!(self.state, ServerState::Receiving)
}
pub fn is_upgraded(&self) -> bool {
matches!(self.state, ServerState::Upgraded { .. })
}
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_new() {
let server = Server::new();
assert!(server.is_receiving());
}
#[test]
fn test_receive_request() {
let mut server = Server::new();
let request_data = b"{\"method\":\"org.example.Ping\"}\0";
server.handle_input(request_data).unwrap();
let event = server.poll_event();
assert!(event.is_some());
if let Some(ServerEvent::Request { request }) = event {
assert_eq!(request.method, "org.example.Ping");
} else {
panic!("Expected request event");
}
}
#[test]
fn test_send_reply() {
let mut server = Server::new();
let reply = Reply {
parameters: None,
continues: None,
error: None,
};
server.send_reply(reply).unwrap();
assert!(server.poll_transmit().is_some());
}
#[test]
fn test_request_reply_cycle() {
let mut server = Server::new();
let request_data = b"{\"method\":\"org.example.Ping\"}\0";
server.handle_input(request_data).unwrap();
let event = server.poll_event();
assert!(event.is_some());
let reply = Reply {
parameters: None,
continues: None,
error: None,
};
server.send_reply(reply).unwrap();
let transmit = server.poll_transmit();
assert!(transmit.is_some());
assert!(server.is_receiving());
}
}