1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#[cfg(test)]
mod agent_test;
use shared::error::*;
use std::collections::{HashMap, VecDeque};
use std::time::Instant;
use crate::message::*;
/// Agent is low-level abstraction over transaction list that
/// handles concurrency and time outs (via Collect call).
#[derive(Default)]
pub struct Agent {
/// transactions is map of transactions that are currently
/// in progress. Event handling is done in such way when
/// transaction is unregistered before AgentTransaction access,
/// minimizing mux lock and protecting AgentTransaction from
/// data races via unexpected concurrent access.
transactions: HashMap<TransactionId, AgentTransaction>,
/// all calls are invalid if true
closed: bool,
/// events queue
events_queue: VecDeque<Event>,
}
/// Event is passed to Handler describing the transaction event.
/// Do not reuse outside Handler.
#[derive(Debug)] //Clone
pub struct Event {
pub id: TransactionId,
pub result: Result<Message>,
}
impl Default for Event {
fn default() -> Self {
Event {
id: TransactionId::default(),
result: Ok(Message::default()),
}
}
}
/// AgentTransaction represents transaction in progress.
/// Concurrent access is invalid.
pub(crate) struct AgentTransaction {
id: TransactionId,
deadline: Instant,
}
/// AGENT_COLLECT_CAP is initial capacity for Agent.Collect slices,
/// sufficient to make function zero-alloc in most cases.
const AGENT_COLLECT_CAP: usize = 100;
/// ClientAgent is Agent implementation that is used by Client to
/// process transactions.
#[derive(Debug)]
pub enum ClientAgent {
Process(Message),
Collect(Instant),
Start(TransactionId, Instant),
Stop(TransactionId),
Close,
}
impl Agent {
/// new initializes and returns new Agent with provided handler.
pub fn new() -> Self {
Agent {
transactions: HashMap::new(),
closed: false,
events_queue: VecDeque::new(),
}
}
pub fn handle_event(&mut self, client_agent: ClientAgent) -> Result<()> {
match client_agent {
ClientAgent::Process(message) => self.process(message),
ClientAgent::Collect(deadline) => self.collect(deadline),
ClientAgent::Start(tid, deadline) => self.start(tid, deadline),
ClientAgent::Stop(tid) => self.stop(tid),
ClientAgent::Close => self.close(),
}
}
pub fn poll_timeout(&mut self) -> Option<Instant> {
let mut deadline = None;
for transaction in self.transactions.values() {
if deadline.is_none() || transaction.deadline < *deadline.as_ref().unwrap() {
deadline = Some(transaction.deadline);
}
}
deadline
}
pub fn poll_event(&mut self) -> Option<Event> {
self.events_queue.pop_front()
}
/// process incoming message, synchronously passing it to handler.
fn process(&mut self, message: Message) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
self.transactions.remove(&message.transaction_id);
self.events_queue.push_back(Event {
id: message.transaction_id,
result: Ok(message),
});
Ok(())
}
/// close terminates all transactions with ErrAgentClosed and renders Agent to
/// closed state.
fn close(&mut self) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
for id in self.transactions.keys() {
self.events_queue.push_back(Event {
id: *id,
result: Err(Error::ErrAgentClosed),
});
}
self.transactions.clear();
self.closed = true;
Ok(())
}
/// start registers transaction with provided id and deadline.
/// Could return ErrAgentClosed, ErrTransactionExists.
///
/// Agent handler is guaranteed to be eventually called.
fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
if self.transactions.contains_key(&id) {
return Err(Error::ErrTransactionExists);
}
self.transactions
.insert(id, AgentTransaction { id, deadline });
Ok(())
}
/// stop stops transaction by id with ErrTransactionStopped, blocking
/// until handler returns.
fn stop(&mut self, id: TransactionId) -> Result<()> {
if self.closed {
return Err(Error::ErrAgentClosed);
}
let v = self.transactions.remove(&id);
if let Some(t) = v {
self.events_queue.push_back(Event {
id: t.id,
result: Err(Error::ErrTransactionStopped),
});
Ok(())
} else {
Err(Error::ErrTransactionNotExists)
}
}
/// collect terminates all transactions that have deadline before provided
/// time, blocking until all handlers will process ErrTransactionTimeOut.
/// Will return ErrAgentClosed if agent is already closed.
///
/// It is safe to call Collect concurrently but makes no sense.
fn collect(&mut self, deadline: Instant) -> Result<()> {
if self.closed {
// Doing nothing if agent is closed.
// All transactions should be already closed
// during Close() call.
return Err(Error::ErrAgentClosed);
}
let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
// Adding all transactions with deadline before gc_time
// to toCall and to_remove slices.
// No allocs if there are less than AGENT_COLLECT_CAP
// timed out transactions.
for (id, t) in &self.transactions {
if t.deadline < deadline {
to_remove.push(*id);
}
}
// Un-registering timed out transactions.
for id in &to_remove {
self.transactions.remove(id);
}
for id in to_remove {
self.events_queue.push_back(Event {
id,
result: Err(Error::ErrTransactionTimeOut),
});
}
Ok(())
}
}