#[cfg(test)]
mod agent_test;
use std::collections::HashMap;
use std::sync::Arc;
use rand::Rng;
use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::client::ClientTransaction;
use crate::error::*;
use crate::message::*;
pub type Handler = Option<Arc<mpsc::UnboundedSender<Event>>>;
pub fn noop_handler() -> Handler {
None
}
pub struct Agent {
transactions: HashMap<TransactionId, AgentTransaction>,
closed: bool,
handler: Handler,
}
#[derive(Debug, Clone)]
pub enum EventType {
Callback(TransactionId),
Insert(ClientTransaction),
Remove(TransactionId),
Close,
}
impl Default for EventType {
fn default() -> Self {
EventType::Callback(TransactionId::default())
}
}
#[derive(Debug)] pub struct Event {
pub event_type: EventType,
pub event_body: Result<Message>,
}
impl Default for Event {
fn default() -> Self {
Event {
event_type: EventType::default(),
event_body: Ok(Message::default()),
}
}
}
pub(crate) struct AgentTransaction {
id: TransactionId,
deadline: Instant,
}
const AGENT_COLLECT_CAP: usize = 100;
#[derive(PartialEq, Eq, Hash, Copy, Clone, Default, Debug)]
pub struct TransactionId(pub [u8; TRANSACTION_ID_SIZE]);
impl TransactionId {
pub fn new() -> Self {
let mut b = TransactionId([0u8; TRANSACTION_ID_SIZE]);
rand::rng().fill(&mut b.0);
b
}
}
impl Setter for TransactionId {
fn add_to(&self, m: &mut Message) -> Result<()> {
m.transaction_id = *self;
m.write_transaction_id();
Ok(())
}
}
#[derive(Debug)]
pub enum ClientAgent {
Process(Message),
Collect(Instant),
Start(TransactionId, Instant),
Stop(TransactionId),
Close,
}
impl Agent {
pub fn new(handler: Handler) -> Self {
Agent {
transactions: HashMap::new(),
closed: false,
handler,
}
}
pub fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
let t = self
.transactions
.remove(&id)
.ok_or(Error::ErrTransactionNotExists)?;
if let Some(handler) = &self.handler {
handler.send(Event {
event_type: EventType::Callback(t.id),
event_body: Err(error),
})?;
}
Ok(())
}
pub fn process(&mut self, message: Message) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
self.transactions.remove(&message.transaction_id);
let e = Event {
event_type: EventType::Callback(message.transaction_id),
event_body: Ok(message),
};
if let Some(handler) = &self.handler {
handler.send(e)?;
}
Ok(())
}
pub fn close(&mut self) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
for id in self.transactions.keys() {
let e = Event {
event_type: EventType::Callback(*id),
event_body: Err(Error::ErrAgentClosed),
};
if let Some(handler) = &self.handler {
handler.send(e)?;
}
}
self.transactions = HashMap::new();
self.closed = true;
self.handler = noop_handler();
Ok(())
}
pub fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
if self.transactions.contains_key(&id) {
return Err(Error::ErrTransactionExists);
}
self.transactions
.insert(id, AgentTransaction { id, deadline });
Ok(())
}
pub fn stop(&mut self, id: TransactionId) -> Result<()> {
self.stop_with_error(id, Error::ErrTransactionStopped)
}
pub fn collect(&mut self, deadline: Instant) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
for (id, t) in &self.transactions {
if t.deadline < deadline {
to_remove.push(*id);
}
}
for id in &to_remove {
self.transactions.remove(id);
}
for id in to_remove {
let event = Event {
event_type: EventType::Callback(id),
event_body: Err(Error::ErrTransactionTimeOut),
};
if let Some(handler) = &self.handler {
handler.send(event)?;
}
}
Ok(())
}
pub fn set_handler(&mut self, h: Handler) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
self.handler = h;
Ok(())
}
pub(crate) async fn run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>) {
while let Some(client_agent) = rx.recv().await {
let result = match client_agent {
ClientAgent::Process(message) => agent.process(message),
ClientAgent::Collect(deadline) => agent.collect(deadline),
ClientAgent::Start(tid, deadline) => agent.start(tid, deadline),
ClientAgent::Stop(tid) => agent.stop(tid),
ClientAgent::Close => agent.close(),
};
if let Err(err) = result {
if Error::ErrAgentClosed == err {
break;
}
}
}
}
}