1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#[cfg(test)]
mod agent_test;

use crate::errors::*;
use crate::message::*;

use util::Error;

use tokio::sync::mpsc;
use tokio::time::Instant;

use std::collections::HashMap;
use std::sync::Arc;

use crate::client::ClientTransaction;
use rand::Rng;

// Handler handles state changes of transaction.
// Handler is called on transaction state change.
// Usage of e is valid only during call, user must
// copy needed fields explicitly.
pub type Handler = Option<Arc<mpsc::UnboundedSender<Event>>>;

// noop_handler just discards any event.
pub fn noop_handler() -> Handler {
    None
}

// Agent is low-level abstraction over transaction list that
// handles concurrency (all calls are goroutine-safe) and
// time outs (via Collect call).
pub struct Agent {
    // transactions is map of transactions that are currently
    // in progress. Event handling is done in such way when
    // transaction is unregistered before AgentTransaction access,
    // minimizing mux lock and protecting AgentTransaction from
    // data races via unexpected concurrent access.
    transactions: HashMap<TransactionId, AgentTransaction>,
    closed: bool,     // all calls are invalid if true
    handler: Handler, // handles transactions
}

#[derive(Debug, Clone)]
pub enum EventType {
    Callback(TransactionId),
    Insert(ClientTransaction),
    Remove(TransactionId),
    Close,
}

impl Default for EventType {
    fn default() -> Self {
        EventType::Callback(TransactionId::default())
    }
}

// Event is passed to Handler describing the transaction event.
// Do not reuse outside Handler.
#[derive(Debug, Clone)]
pub struct Event {
    pub event_type: EventType,
    pub event_body: Result<Message, Error>,
}

impl Default for Event {
    fn default() -> Self {
        Event {
            event_type: EventType::default(),
            event_body: Ok(Message::default()),
        }
    }
}

// AgentTransaction represents transaction in progress.
// Concurrent access is invalid.
pub(crate) struct AgentTransaction {
    id: TransactionId,
    deadline: Instant,
}

// AGENT_COLLECT_CAP is initial capacity for Agent.Collect slices,
// sufficient to make function zero-alloc in most cases.
const AGENT_COLLECT_CAP: usize = 100;

#[derive(PartialEq, Eq, Hash, Copy, Clone, Default, Debug)]
pub struct TransactionId(pub [u8; TRANSACTION_ID_SIZE]);

impl TransactionId {
    // NewTransactionID returns new random transaction ID using crypto/rand
    // as source.
    pub fn new() -> Self {
        let mut b = TransactionId([0u8; TRANSACTION_ID_SIZE]);
        rand::thread_rng().fill(&mut b.0);
        b
    }
}

impl Setter for TransactionId {
    fn add_to(&self, m: &mut Message) -> Result<(), Error> {
        m.transaction_id = *self;
        m.write_transaction_id();
        Ok(())
    }
}

// ClientAgent is Agent implementation that is used by Client to
// process transactions.
pub enum ClientAgent {
    Process(Message),
    Collect(Instant),
    Start(TransactionId, Instant),
    Stop(TransactionId),
    Close,
}

// NewAgent initializes and returns new Agent with provided handler.
// If h is nil, the noop_handler will be used.
impl Agent {
    pub fn new(handler: Handler) -> Self {
        Agent {
            transactions: HashMap::new(),
            closed: false,
            handler,
        }
    }

    // stop_with_error removes transaction from list and calls handler with
    // provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
    fn stop_with_error(&mut self, id: TransactionId, error: Error) -> Result<(), Error> {
        if self.closed {
            return Err(ERR_AGENT_CLOSED.clone());
        }

        let v = self.transactions.remove(&id);
        if let Some(t) = v {
            if let Some(handler) = &self.handler {
                handler.send(Event {
                    event_type: EventType::Callback(t.id),
                    event_body: Err(error),
                })?;
            }
            Ok(())
        } else {
            Err(ERR_TRANSACTION_NOT_EXISTS.clone())
        }
    }

    // process incoming message, synchronously passing it to handler.
    fn process(&mut self, message: Message) -> Result<(), Error> {
        if self.closed {
            return Err(ERR_AGENT_CLOSED.clone());
        }

        self.transactions.remove(&message.transaction_id);

        let e = Event {
            event_type: EventType::Callback(message.transaction_id),
            event_body: Ok(message),
        };

        if let Some(handler) = &self.handler {
            handler.send(e)?;
        }

        Ok(())
    }

    // Close terminates all transactions with ErrAgentClosed and renders Agent to
    // closed state.
    fn close(&mut self) -> Result<(), Error> {
        if self.closed {
            return Err(ERR_AGENT_CLOSED.clone());
        }

        for id in self.transactions.keys() {
            let e = Event {
                event_type: EventType::Callback(*id),
                event_body: Err(ERR_AGENT_CLOSED.clone()),
            };
            if let Some(handler) = &self.handler {
                handler.send(e)?;
            }
        }
        self.transactions = HashMap::new();
        self.closed = true;
        self.handler = noop_handler();

        Ok(())
    }

    // Start registers transaction with provided id and deadline.
    // Could return ErrAgentClosed, ErrTransactionExists.
    //
    // Agent handler is guaranteed to be eventually called.
    fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<(), Error> {
        if self.closed {
            return Err(ERR_AGENT_CLOSED.clone());
        }
        if self.transactions.contains_key(&id) {
            return Err(ERR_TRANSACTION_EXISTS.clone());
        }

        self.transactions
            .insert(id, AgentTransaction { id, deadline });

        Ok(())
    }

    // Stop stops transaction by id with ErrTransactionStopped, blocking
    // until handler returns.
    fn stop(&mut self, id: TransactionId) -> Result<(), Error> {
        self.stop_with_error(id, ERR_TRANSACTION_STOPPED.clone())
    }

    // Collect terminates all transactions that have deadline before provided
    // time, blocking until all handlers will process ErrTransactionTimeOut.
    // Will return ErrAgentClosed if agent is already closed.
    //
    // It is safe to call Collect concurrently but makes no sense.
    fn collect(&mut self, deadline: Instant) -> Result<(), Error> {
        if self.closed {
            // Doing nothing if agent is closed.
            // All transactions should be already closed
            // during Close() call.
            return Err(ERR_AGENT_CLOSED.clone());
        }

        let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);

        // Adding all transactions with deadline before gc_time
        // to toCall and to_remove slices.
        // No allocs if there are less than AGENT_COLLECT_CAP
        // timed out transactions.
        for (id, t) in &self.transactions {
            if t.deadline < deadline {
                to_remove.push(*id);
            }
        }
        // Un-registering timed out transactions.
        for id in &to_remove {
            self.transactions.remove(id);
        }

        for id in to_remove {
            let event = Event {
                event_type: EventType::Callback(id),
                event_body: Err(ERR_TRANSACTION_TIME_OUT.clone()),
            };
            if let Some(handler) = &self.handler {
                handler.send(event)?;
            }
        }

        Ok(())
    }

    // set_handler sets agent handler to h.
    fn set_handler(&mut self, h: Handler) -> Result<(), Error> {
        if self.closed {
            return Err(ERR_AGENT_CLOSED.clone());
        }
        self.handler = h;

        Ok(())
    }

    pub async fn run(mut agent: Agent, mut rx: mpsc::Receiver<ClientAgent>) {
        while let Some(client_agent) = rx.recv().await {
            let result = match client_agent {
                ClientAgent::Process(message) => agent.process(message),
                ClientAgent::Collect(deadline) => agent.collect(deadline),
                ClientAgent::Start(tid, deadline) => agent.start(tid, deadline),
                ClientAgent::Stop(tid) => agent.stop(tid),
                ClientAgent::Close => agent.close(),
            };

            if let Err(err) = result {
                if err == *ERR_AGENT_CLOSED {
                    break;
                }
            }
        }
    }
}