1#[cfg(test)]
2mod agent_test;
3
4use shared::error::*;
5use std::collections::{HashMap, VecDeque};
6use std::time::Instant;
7
8use crate::message::*;
9
10#[derive(Default)]
13pub struct Agent {
14 transactions: HashMap<TransactionId, AgentTransaction>,
20 closed: bool,
22 events_queue: VecDeque<Event>,
24}
25
26#[derive(Debug)] pub struct Event {
30 pub id: TransactionId,
31 pub result: Result<Message>,
32}
33
34impl Default for Event {
35 fn default() -> Self {
36 Event {
37 id: TransactionId::default(),
38 result: Ok(Message::default()),
39 }
40 }
41}
42
43pub(crate) struct AgentTransaction {
46 id: TransactionId,
47 deadline: Instant,
48}
49
50const AGENT_COLLECT_CAP: usize = 100;
53
54#[derive(Debug)]
57pub enum ClientAgent {
58 Process(Message),
59 Collect(Instant),
60 Start(TransactionId, Instant),
61 Stop(TransactionId),
62 Close,
63}
64
65impl Agent {
66 pub fn new() -> Self {
68 Agent {
69 transactions: HashMap::new(),
70 closed: false,
71 events_queue: VecDeque::new(),
72 }
73 }
74
75 pub fn handle_event(&mut self, client_agent: ClientAgent) -> Result<()> {
76 match client_agent {
77 ClientAgent::Process(message) => self.process(message),
78 ClientAgent::Collect(deadline) => self.collect(deadline),
79 ClientAgent::Start(tid, deadline) => self.start(tid, deadline),
80 ClientAgent::Stop(tid) => self.stop(tid),
81 ClientAgent::Close => self.close(),
82 }
83 }
84
85 pub fn poll_timeout(&mut self) -> Option<Instant> {
86 let mut deadline = None;
87 for transaction in self.transactions.values() {
88 if deadline.is_none() || transaction.deadline < *deadline.as_ref().unwrap() {
89 deadline = Some(transaction.deadline);
90 }
91 }
92 deadline
93 }
94
95 pub fn poll_event(&mut self) -> Option<Event> {
96 self.events_queue.pop_front()
97 }
98
99 fn process(&mut self, message: Message) -> Result<()> {
101 if self.closed {
102 return Err(Error::ErrAgentClosed);
103 }
104
105 self.transactions.remove(&message.transaction_id);
106
107 self.events_queue.push_back(Event {
108 id: message.transaction_id,
109 result: Ok(message),
110 });
111
112 Ok(())
113 }
114
115 fn close(&mut self) -> Result<()> {
118 if self.closed {
119 return Err(Error::ErrAgentClosed);
120 }
121
122 for id in self.transactions.keys() {
123 self.events_queue.push_back(Event {
124 id: *id,
125 result: Err(Error::ErrAgentClosed),
126 });
127 }
128 self.transactions.clear();
129 self.closed = true;
130
131 Ok(())
132 }
133
134 fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
139 if self.closed {
140 return Err(Error::ErrAgentClosed);
141 }
142 if self.transactions.contains_key(&id) {
143 return Err(Error::ErrTransactionExists);
144 }
145
146 self.transactions
147 .insert(id, AgentTransaction { id, deadline });
148
149 Ok(())
150 }
151
152 fn stop(&mut self, id: TransactionId) -> Result<()> {
155 if self.closed {
156 return Err(Error::ErrAgentClosed);
157 }
158
159 let v = self.transactions.remove(&id);
160 if let Some(t) = v {
161 self.events_queue.push_back(Event {
162 id: t.id,
163 result: Err(Error::ErrTransactionStopped),
164 });
165 Ok(())
166 } else {
167 Err(Error::ErrTransactionNotExists)
168 }
169 }
170
171 fn collect(&mut self, deadline: Instant) -> Result<()> {
177 if self.closed {
178 return Err(Error::ErrAgentClosed);
182 }
183
184 let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
185
186 for (id, t) in &self.transactions {
191 if t.deadline < deadline {
192 to_remove.push(*id);
193 }
194 }
195 for id in &to_remove {
197 self.transactions.remove(id);
198 }
199
200 for id in to_remove {
201 self.events_queue.push_back(Event {
202 id,
203 result: Err(Error::ErrTransactionTimeOut),
204 });
205 }
206
207 Ok(())
208 }
209}