1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
use crate::SrtpError;
use crate::header::{RtcpHeader, RtpHeader};
use crate::stream::{KeyLimitHandler, RecvStream, SendStream, StreamConfig, StreamInterface};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
/// Do not use this struct directly, use [SendSession] or [RecvSession] instead
///
/// Common interface to sender and receiver sessions
pub struct Session<T: StreamInterface> {
/// indexed by ssrc store streams and flag set to true when this stream was spawn from the
/// template stream config
streams: HashMap<u32, (T, bool)>,
/// the current template stream config
template_config: Option<StreamConfig>,
/// optionnal callback to report a key uses limit reached
key_limit_handler: Option<KeyLimitHandler>,
}
impl<T: StreamInterface> Default for Session<T> {
fn default() -> Self {
Self {
streams: HashMap::new(),
template_config: None,
key_limit_handler: None,
}
}
}
/// Common API for sender and receiver sessions
impl<T: StreamInterface> Session<T> {
/// Create a new session that can hold several streams
///
/// A session can hold:
/// - several streams directly identified by their SSRC. These streams are independants and may differ in
/// protection profile used.
/// - one stream template used to spawn a stream when a new SSRC is found in a packet header
///
pub fn new() -> Self {
Self::default()
}
/// Set a handler called when a key reaches the soft or hard uses limit
///
/// The limits are fixed: 2^31 for RTCP, 2^48 for RTP. See RFC 3711 - 9.2
///
/// SoftLimit alert is raised when there is less than 2^16 uses left
///
/// # handler argument:
/// - SrtpError: the handler is always called with a KeyLimit error. See
/// [SrtpError::KeyLimit] for details
///
/// # Notes:
/// - This handler must be set before any stream is added to the session.
pub fn set_key_limit_handler<F>(&mut self, handler: F) -> Result<(), SrtpError>
where
F: Fn(SrtpError) + Send + Sync + 'static,
{
if !self.streams.is_empty() {
return Err(SrtpError::ContextNotReady);
}
self.key_limit_handler = Some(Arc::new(handler));
Ok(())
}
/// Add (or update) a stream
///
/// if the given ssrc match an existing stream, update it, otherwise add it.
///
/// On update (used for re-keying), mki policy must be compatible with current one.
///
/// # Arguments
/// * ssrc: the SSRC that identify the stream, when None: refers to the stream template
/// * config: configuration for this stream
/// # Returns
/// Ok or an error if the stream cannot be added or updated
pub fn add_stream(
&mut self,
ssrc: Option<u32>,
config: &StreamConfig,
) -> Result<(), SrtpError> {
config.validate()?;
match ssrc {
None => {
// no ssrc, this is the template stream
// update the templated_ssrcs
for (stream, from_template) in self.streams.values_mut() {
if *from_template {
stream.update(config)?; // check config compat and update
}
}
// store the template config
self.template_config = Some(config.clone());
}
Some(ssrc) => {
// some ssrc, this is a regular stream
match self.streams.get_mut(&ssrc) {
None => {
// stream not present, add it
self.streams.insert(
ssrc,
(T::new(ssrc, config, self.key_limit_handler.clone())?, false),
);
}
Some((stream, from_template)) => {
// this stream already exits, try to update
stream.update(config)?;
*from_template = false; // make sure this flag is not set as spwaned from template
}
}
}
}
Ok(())
}
/// remove a stream
/// # Arguments
/// * ssrc: the SSRC that identify the stream
/// # Returns
/// Ok or a StreamNotFound error
pub fn remove_stream(&mut self, ssrc: u32) -> Result<(), SrtpError> {
self.streams
.remove(&ssrc)
.ok_or(SrtpError::StreamNotFound)?;
Ok(())
}
/// get the current ROC from a specific stream
///
/// # Arguments
/// - ssrc: the ssrc identifying the stream
///
/// # Returns
/// the ROC if the stream is found, a StreamNotFound error otherwise
pub fn get_roc(&self, ssrc: u32) -> Result<u32, SrtpError> {
let (stream, _from_template) = self.streams.get(&ssrc).ok_or(SrtpError::StreamNotFound)?;
Ok(stream.get_roc())
}
/// Get a stream
/// If the stream is not found, try to create it using the configuration template
/// # Arguments
/// * `ssrc` - the SSRC that identify the stream
/// # Returns
/// * stream: a mutable reference to the stream
/// * from_template : a flag set to true if this stream was created from template
/// * pending: a flag set to true if the stream was just created from template
///
/// or an error when the stream is not found
fn get_stream(&mut self, ssrc: u32) -> Result<(&mut T, bool, bool), SrtpError> {
match self.streams.entry(ssrc) {
// found the stream
Entry::Occupied(entry) => {
let (stream, from_template) = entry.into_mut();
Ok((stream, *from_template, false))
}
// stream not found: try to add it using the template
Entry::Vacant(entry) => {
let template_config = self
.template_config
.as_ref()
.ok_or(SrtpError::StreamNotFound)?;
let (stream, _from_template) = entry.insert((
T::new(ssrc, template_config, self.key_limit_handler.clone())?,
true,
));
Ok((stream, true, true))
}
}
}
/// handler for stream operation returning an error:
/// * if the stream was pending, remove it
/// * if this is an error signaling hard limit reached in a key usage -> block other stream
/// that derives from the same master key. For hard limit, the error carries the mki value,
/// remap it to the hard limit error without mki
///
/// # Arguments
/// * e: the error returned by the stream operation
/// * from_template: flag true when the stream derives from template config
/// * pending: true is this stream was newly spawned from template
fn handle_stream_error(
&mut self,
e: &SrtpError,
ssrc: u32,
from_template: bool,
pending: bool,
) {
// when stream is newly spawned, we failed to confirm its validity, remove it
if pending {
let _ = self.remove_stream(ssrc);
}
// when a templated stream raise a hard limit error
if from_template
&& let SrtpError::KeyLimit {
mki, is_dead: true, ..
} = e
{
for (stream, from_template) in self.streams.values_mut() {
if *from_template {
// kill all the templated streams derived from the same key
let _ = stream.kill_session_keys(mki);
}
}
// we must also disable this key in the template config
if let Some(cfg) = self.template_config.as_mut() {
let _ = cfg.set_keys_lifetime_mki(0, 0, 0, 0, mki);
}
};
}
}
/// Session type used for sending streams
pub type SendSession = Session<SendStream>;
/// Session type used for receiving streams
pub type RecvSession = Session<RecvStream>;
impl SendSession {
/// Encrypt a RTP packet
///
/// Packet is encrypted in place, the function gets packet ownership and gives it back
///
/// # Arguments
/// - plain : the plain RTP packet
/// - mki: optional mki to be used. It must match a mki configured in a stream with a SSRC
/// matching the one present in the packet header
///
/// # Returns
/// the encrypted packet or an error
pub fn rtp_protect_mki(
&mut self,
plain: Vec<u8>,
mki: &Option<Vec<u8>>,
) -> Result<Vec<u8>, SrtpError> {
// parse header to get ssrc
let header = RtpHeader::new(&plain)?;
// get the stream, pending is true when the stream just got created from template
let (stream, from_template, pending) = self.get_stream(header.ssrc())?;
// apply the protect
stream
.rtp_protect(&header, plain, mki)
.inspect_err(|e| self.handle_stream_error(e, header.ssrc(), from_template, pending))
}
/// Encrypt a RTP packet: convenience wrapper when mki is not used
pub fn rtp_protect(&mut self, plain: Vec<u8>) -> Result<Vec<u8>, SrtpError> {
self.rtp_protect_mki(plain, &None)
}
/// Encrypt a RTCP packet
///
/// Packet is encrypted in place, the function gets packet ownership and gives it back
///
/// # Arguments
/// - plain : the plain RTP packet
/// - mki: optional mki to be used. It must match a mki configured in a stream with a SSRC
/// matching the one present in the packet header
///
/// # Returns
/// the encrypted packet or an error
pub fn rtcp_protect_mki(
&mut self,
plain: Vec<u8>,
mki: &Option<Vec<u8>>,
) -> Result<Vec<u8>, SrtpError> {
// parse header to get ssrc
let header = RtcpHeader::new(&plain)?;
let (stream, from_template, pending) = self.get_stream(header.ssrc())?;
stream
.rtcp_protect(&header, plain, mki)
.inspect_err(|e| self.handle_stream_error(e, header.ssrc(), from_template, pending))
}
/// Encrypt a RTCP packet: convenience wrapper when mki is not used
pub fn rtcp_protect(&mut self, plain: Vec<u8>) -> Result<Vec<u8>, SrtpError> {
self.rtcp_protect_mki(plain, &None)
}
}
impl RecvSession {
/// set the ROC on a specific stream
///
/// Used for late joiners to a multicasted stream. This function is usually called just after
/// the stream is added otherwise unprotect will fail.
///
/// The ROC is confirmed in the stream after the next successfull unprotect operation
///
/// The given ROC must be lower or equal than the current one (which should be 0 if this is
/// called just after the stream is added to the session)
///
/// # Arguments
/// - ssrc: the ssrc identifying the stream
/// - roc: the roc value
///
/// # Returns Ok or
/// * StreamNotFound: no stream matching this SSRC in the session
/// * InvalidPacketIndex: given ROC < current ROC
pub fn set_roc(&mut self, ssrc: u32, roc: u32) -> Result<(), SrtpError> {
let (stream, _from_template) = self
.streams
.get_mut(&ssrc)
.ok_or(SrtpError::StreamNotFound)?;
stream.set_roc(roc)?;
Ok(())
}
/// Decrypt a RTP packet
///
/// Packet is decrypted in place, the function gets packet ownership and gives it back
///
/// # Arguments
/// - cipher : the encrypted RTP packet
///
/// # Returns
/// the decrypted RP packet or an error
pub fn rtp_unprotect(&mut self, cipher: Vec<u8>) -> Result<Vec<u8>, SrtpError> {
// parse header to get ssrc
let header = RtpHeader::new(&cipher)?;
// get (or create from template) stream and apply the transform
let (stream, from_template, pending) = self.get_stream(header.ssrc())?;
stream
.rtp_unprotect(&header, cipher)
.inspect_err(|e| self.handle_stream_error(e, header.ssrc(), from_template, pending))
}
/// Decrypt a RTCP packet
///
/// Packet is decrypted in place, the function gets packet ownership and gives it back
///
/// # Arguments
/// - cipher : the encrypted RTCP packet
///
/// # Returns
/// the decrypted RTCP packet or an error
pub fn rtcp_unprotect(&mut self, cipher: Vec<u8>) -> Result<Vec<u8>, SrtpError> {
// parse header to get ssrc
let header = RtcpHeader::new(&cipher)?;
// get (or create from template) stream and apply the transform
let (stream, from_template, pending) = self.get_stream(header.ssrc())?;
stream
.rtcp_unprotect(&header, cipher)
.inspect_err(|e| self.handle_stream_error(e, header.ssrc(), from_template, pending))
}
}