rtc_stun/
agent.rs

1#[cfg(test)]
2mod agent_test;
3
4use shared::error::*;
5use std::collections::{HashMap, VecDeque};
6use std::time::Instant;
7
8use crate::message::*;
9
10/// Agent is low-level abstraction over transaction list that
11/// handles concurrency and time outs (via Collect call).
12#[derive(Default)]
13pub struct Agent {
14    /// transactions is map of transactions that are currently
15    /// in progress. Event handling is done in such way when
16    /// transaction is unregistered before AgentTransaction access,
17    /// minimizing mux lock and protecting AgentTransaction from
18    /// data races via unexpected concurrent access.
19    transactions: HashMap<TransactionId, AgentTransaction>,
20    /// all calls are invalid if true
21    closed: bool,
22    /// events queue
23    events_queue: VecDeque<Event>,
24}
25
26/// Event is passed to Handler describing the transaction event.
27/// Do not reuse outside Handler.
28#[derive(Debug)] //Clone
29pub struct Event {
30    pub id: TransactionId,
31    pub result: Result<Message>,
32}
33
34impl Default for Event {
35    fn default() -> Self {
36        Event {
37            id: TransactionId::default(),
38            result: Ok(Message::default()),
39        }
40    }
41}
42
43/// AgentTransaction represents transaction in progress.
44/// Concurrent access is invalid.
45pub(crate) struct AgentTransaction {
46    id: TransactionId,
47    deadline: Instant,
48}
49
50/// AGENT_COLLECT_CAP is initial capacity for Agent.Collect slices,
51/// sufficient to make function zero-alloc in most cases.
52const AGENT_COLLECT_CAP: usize = 100;
53
54/// ClientAgent is Agent implementation that is used by Client to
55/// process transactions.
56#[derive(Debug)]
57pub enum ClientAgent {
58    Process(Message),
59    Collect(Instant),
60    Start(TransactionId, Instant),
61    Stop(TransactionId),
62    Close,
63}
64
65impl Agent {
66    /// new initializes and returns new Agent with provided handler.
67    pub fn new() -> Self {
68        Agent {
69            transactions: HashMap::new(),
70            closed: false,
71            events_queue: VecDeque::new(),
72        }
73    }
74
75    pub fn handle_event(&mut self, client_agent: ClientAgent) -> Result<()> {
76        match client_agent {
77            ClientAgent::Process(message) => self.process(message),
78            ClientAgent::Collect(deadline) => self.collect(deadline),
79            ClientAgent::Start(tid, deadline) => self.start(tid, deadline),
80            ClientAgent::Stop(tid) => self.stop(tid),
81            ClientAgent::Close => self.close(),
82        }
83    }
84
85    pub fn poll_timeout(&mut self) -> Option<Instant> {
86        let mut deadline = None;
87        for transaction in self.transactions.values() {
88            if deadline.is_none() || transaction.deadline < *deadline.as_ref().unwrap() {
89                deadline = Some(transaction.deadline);
90            }
91        }
92        deadline
93    }
94
95    pub fn poll_event(&mut self) -> Option<Event> {
96        self.events_queue.pop_front()
97    }
98
99    /// process incoming message, synchronously passing it to handler.
100    fn process(&mut self, message: Message) -> Result<()> {
101        if self.closed {
102            return Err(Error::ErrAgentClosed);
103        }
104
105        self.transactions.remove(&message.transaction_id);
106
107        self.events_queue.push_back(Event {
108            id: message.transaction_id,
109            result: Ok(message),
110        });
111
112        Ok(())
113    }
114
115    /// close terminates all transactions with ErrAgentClosed and renders Agent to
116    /// closed state.
117    fn close(&mut self) -> Result<()> {
118        if self.closed {
119            return Err(Error::ErrAgentClosed);
120        }
121
122        for id in self.transactions.keys() {
123            self.events_queue.push_back(Event {
124                id: *id,
125                result: Err(Error::ErrAgentClosed),
126            });
127        }
128        self.transactions.clear();
129        self.closed = true;
130
131        Ok(())
132    }
133
134    /// start registers transaction with provided id and deadline.
135    /// Could return ErrAgentClosed, ErrTransactionExists.
136    ///
137    /// Agent handler is guaranteed to be eventually called.
138    fn start(&mut self, id: TransactionId, deadline: Instant) -> Result<()> {
139        if self.closed {
140            return Err(Error::ErrAgentClosed);
141        }
142        if self.transactions.contains_key(&id) {
143            return Err(Error::ErrTransactionExists);
144        }
145
146        self.transactions
147            .insert(id, AgentTransaction { id, deadline });
148
149        Ok(())
150    }
151
152    /// stop stops transaction by id with ErrTransactionStopped, blocking
153    /// until handler returns.
154    fn stop(&mut self, id: TransactionId) -> Result<()> {
155        if self.closed {
156            return Err(Error::ErrAgentClosed);
157        }
158
159        let v = self.transactions.remove(&id);
160        if let Some(t) = v {
161            self.events_queue.push_back(Event {
162                id: t.id,
163                result: Err(Error::ErrTransactionStopped),
164            });
165            Ok(())
166        } else {
167            Err(Error::ErrTransactionNotExists)
168        }
169    }
170
171    /// collect terminates all transactions that have deadline before provided
172    /// time, blocking until all handlers will process ErrTransactionTimeOut.
173    /// Will return ErrAgentClosed if agent is already closed.
174    ///
175    /// It is safe to call Collect concurrently but makes no sense.
176    fn collect(&mut self, deadline: Instant) -> Result<()> {
177        if self.closed {
178            // Doing nothing if agent is closed.
179            // All transactions should be already closed
180            // during Close() call.
181            return Err(Error::ErrAgentClosed);
182        }
183
184        let mut to_remove: Vec<TransactionId> = Vec::with_capacity(AGENT_COLLECT_CAP);
185
186        // Adding all transactions with deadline before gc_time
187        // to toCall and to_remove slices.
188        // No allocs if there are less than AGENT_COLLECT_CAP
189        // timed out transactions.
190        for (id, t) in &self.transactions {
191            if t.deadline < deadline {
192                to_remove.push(*id);
193            }
194        }
195        // Un-registering timed out transactions.
196        for id in &to_remove {
197            self.transactions.remove(id);
198        }
199
200        for id in to_remove {
201            self.events_queue.push_back(Event {
202                id,
203                result: Err(Error::ErrTransactionTimeOut),
204            });
205        }
206
207        Ok(())
208    }
209}