use super::protocol::{parse_message, parse_reply, serialize_request};
use super::types::{ClientEvent, ParseResult, Transmit};
use crate::{Request, Result};
use std::collections::VecDeque;
#[derive(Debug, Clone, PartialEq)]
pub enum ClientState {
Idle,
AwaitingReply {
method: String,
more: bool,
},
Receiving {
method: String,
},
Upgraded {
interface: String,
},
Error {
message: String,
},
}
#[derive(Debug)]
pub struct Client {
state: ClientState,
send_buf: VecDeque<Transmit>,
recv_buf: Vec<u8>,
pending_events: VecDeque<ClientEvent>,
}
impl Client {
pub fn new() -> Self {
Self {
state: ClientState::Idle,
send_buf: VecDeque::new(),
recv_buf: Vec::new(),
pending_events: VecDeque::new(),
}
}
pub fn send_request(&mut self, method: String, request: Request) -> Result<()> {
match &self.state {
ClientState::Idle => {
let payload = serialize_request(&request)?;
self.send_buf.push_back(Transmit::new(payload));
let more = request.more.unwrap_or(false);
self.state = ClientState::AwaitingReply { method, more };
Ok(())
}
_ => Err(crate::context!(crate::ErrorKind::ConnectionBusy)),
}
}
pub fn handle_input(&mut self, data: &[u8]) -> Result<()> {
self.recv_buf.extend_from_slice(data);
loop {
match parse_message(&self.recv_buf) {
ParseResult::Complete { message, consumed } => {
self.recv_buf.drain(..consumed);
let reply = parse_reply(&message)?;
match &self.state {
ClientState::AwaitingReply { method, more } => {
let continues = reply.continues.unwrap_or(false);
self.pending_events.push_back(ClientEvent::Reply {
method: method.clone(),
reply: reply.clone(),
continues,
});
if continues && *more {
self.state = ClientState::Receiving {
method: method.clone(),
};
} else {
self.state = ClientState::Idle;
}
}
ClientState::Receiving { method } => {
let continues = reply.continues.unwrap_or(false);
self.pending_events.push_back(ClientEvent::Reply {
method: method.clone(),
reply: reply.clone(),
continues,
});
if !continues {
self.state = ClientState::Idle;
}
}
_ => {
return Err(crate::context!(crate::ErrorKind::InvalidParameter(
"Unexpected reply".into()
)));
}
}
}
ParseResult::Incomplete { .. } => {
break;
}
ParseResult::Invalid { error } => {
self.state = ClientState::Error {
message: error.clone(),
};
return Err(crate::context!(crate::ErrorKind::InvalidParameter(error)));
}
}
}
Ok(())
}
pub fn poll_transmit(&mut self) -> Option<Transmit> {
self.send_buf.pop_front()
}
pub fn poll_event(&mut self) -> Option<ClientEvent> {
self.pending_events.pop_front()
}
pub fn state(&self) -> &ClientState {
&self.state
}
pub fn is_idle(&self) -> bool {
matches!(self.state, ClientState::Idle)
}
}
impl Default for Client {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::borrow::Cow;
#[test]
fn test_client_new() {
let client = Client::new();
assert!(client.is_idle());
}
#[test]
fn test_send_request() {
let mut client = Client::new();
let request = Request {
method: Cow::Borrowed("org.example.Ping"),
parameters: None,
more: None,
oneway: None,
upgrade: None,
};
client
.send_request("org.example.Ping".into(), request)
.unwrap();
assert!(client.poll_transmit().is_some());
}
#[test]
fn test_receive_reply() {
let mut client = Client::new();
let request = Request {
method: Cow::Borrowed("org.example.Ping"),
parameters: None,
more: None,
oneway: None,
upgrade: None,
};
client
.send_request("org.example.Ping".into(), request)
.unwrap();
let _ = client.poll_transmit();
let reply_data = b"{}\0";
client.handle_input(reply_data).unwrap();
let event = client.poll_event();
assert!(event.is_some());
assert!(client.is_idle());
}
}