1#[cfg(test)]
2mod request_test;
3
4use crate::allocation::allocation_manager::*;
5use crate::allocation::channel_bind::ChannelBind;
6use crate::allocation::five_tuple::*;
7use crate::allocation::permission::Permission;
8use crate::auth::*;
9use crate::errors::*;
10use crate::proto::chandata::ChannelData;
11use crate::proto::channum::ChannelNumber;
12use crate::proto::data::Data;
13use crate::proto::evenport::EvenPort;
14use crate::proto::lifetime::*;
15use crate::proto::peeraddr::PeerAddress;
16use crate::proto::relayaddr::RelayedAddress;
17use crate::proto::reqtrans::RequestedTransport;
18use crate::proto::rsrvtoken::ReservationToken;
19use crate::proto::*;
20
21use stun::agent::*;
22use stun::attributes::*;
23use stun::error_code::*;
24use stun::fingerprint::*;
25use stun::integrity::*;
26use stun::message::*;
27use stun::textattrs::*;
28use stun::uattrs::*;
29use stun::xoraddr::*;
30
31use util::{Conn, Error};
32
33use std::collections::HashMap;
34use std::marker::{Send, Sync};
35use std::net::SocketAddr;
36use std::sync::Arc;
37use std::time::SystemTime;
38
39use tokio::sync::Mutex;
40use tokio::time::{Duration, Instant};
41
42use md5::{Digest, Md5};
43
44pub(crate) const MAXIMUM_ALLOCATION_LIFETIME: Duration = Duration::from_secs(3600); pub(crate) const NONCE_LIFETIME: Duration = Duration::from_secs(3600); pub struct Request {
49 pub conn: Arc<dyn Conn + Send + Sync>,
51 pub src_addr: SocketAddr,
52 pub buff: Vec<u8>,
53
54 pub allocation_manager: Arc<Manager>,
56 pub nonces: Arc<Mutex<HashMap<String, Instant>>>,
57
58 pub auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
60 pub realm: String,
61 pub channel_bind_timeout: Duration,
62}
63
64impl Request {
65 pub fn new(
66 conn: Arc<dyn Conn + Send + Sync>,
67 src_addr: SocketAddr,
68 allocation_manager: Arc<Manager>,
69 auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
70 ) -> Self {
71 Request {
72 conn,
73 src_addr,
74 buff: vec![],
75 allocation_manager,
76 nonces: Arc::new(Mutex::new(HashMap::new())),
77 auth_handler,
78 realm: String::new(),
79 channel_bind_timeout: Duration::from_secs(0),
80 }
81 }
82
83 pub async fn handle_request(&mut self) -> Result<(), Error> {
85 log::debug!(
86 "received {} bytes of udp from {} on {}",
87 self.buff.len(),
88 self.src_addr,
89 self.conn.local_addr().await?
90 );
91
92 if ChannelData::is_channel_data(&self.buff) {
93 self.handle_data_packet().await
94 } else {
95 self.handle_turn_packet().await
96 }
97 }
98
99 async fn handle_data_packet(&mut self) -> Result<(), Error> {
100 log::debug!("received DataPacket from {}", self.src_addr);
101 let mut c = ChannelData {
102 raw: self.buff.clone(),
103 ..Default::default()
104 };
105 c.decode()?;
106 self.handle_channel_data(&c).await
107 }
108
109 async fn handle_turn_packet(&mut self) -> Result<(), Error> {
110 log::debug!("handle_turn_packet");
111 let mut m = Message {
112 raw: self.buff.clone(),
113 ..Default::default()
114 };
115 m.decode()?;
116
117 self.process_message_handler(&m).await
118 }
119
120 async fn process_message_handler(&mut self, m: &Message) -> Result<(), Error> {
121 if m.typ.class == CLASS_INDICATION {
122 match m.typ.method {
123 METHOD_SEND => self.handle_send_indication(m).await,
124 _ => Err(ERR_UNEXPECTED_CLASS.to_owned()),
125 }
126 } else if m.typ.class == CLASS_REQUEST {
127 match m.typ.method {
128 METHOD_ALLOCATE => self.handle_allocate_request(m).await,
129 METHOD_REFRESH => self.handle_refresh_request(m).await,
130 METHOD_CREATE_PERMISSION => self.handle_create_permission_request(m).await,
131 METHOD_CHANNEL_BIND => self.handle_channel_bind_request(m).await,
132 METHOD_BINDING => self.handle_binding_request(m).await,
133 _ => Err(ERR_UNEXPECTED_CLASS.to_owned()),
134 }
135 } else {
136 Err(ERR_UNEXPECTED_CLASS.to_owned())
137 }
138 }
139
140 pub(crate) async fn authenticate_request(
141 &mut self,
142 m: &Message,
143 calling_method: Method,
144 ) -> Result<Option<MessageIntegrity>, Error> {
145 if !m.contains(ATTR_MESSAGE_INTEGRITY) {
146 self.respond_with_nonce(m, calling_method, CODE_UNAUTHORIZED)
147 .await?;
148 return Ok(None);
149 }
150
151 let mut nonce_attr = Nonce::new(ATTR_NONCE, String::new());
152 let mut username_attr = Username::new(ATTR_USERNAME, String::new());
153 let mut realm_attr = Realm::new(ATTR_REALM, String::new());
154 let bad_request_msg = build_msg(
155 m.transaction_id,
156 MessageType::new(calling_method, CLASS_ERROR_RESPONSE),
157 vec![Box::new(ErrorCodeAttribute {
158 code: CODE_BAD_REQUEST,
159 reason: vec![],
160 })],
161 )?;
162
163 if let Err(err) = nonce_attr.get_from(m) {
164 build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await?;
165 return Ok(None);
166 }
167
168 let to_be_deleted = {
169 let mut nonces = self.nonces.lock().await;
171
172 let to_be_deleted = if let Some(nonce_creation_time) = nonces.get(&nonce_attr.text) {
173 Instant::now().duration_since(*nonce_creation_time) >= NONCE_LIFETIME
174 } else {
175 true
176 };
177
178 if to_be_deleted {
179 nonces.remove(&nonce_attr.text);
180 }
181 to_be_deleted
182 };
183
184 if to_be_deleted {
185 self.respond_with_nonce(m, calling_method, CODE_STALE_NONCE)
186 .await?;
187 return Ok(None);
188 }
189
190 if let Err(err) = realm_attr.get_from(m) {
191 build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await?;
192 return Ok(None);
193 }
194 if let Err(err) = username_attr.get_from(m) {
195 build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await?;
196 return Ok(None);
197 }
198
199 let our_key = match self.auth_handler.auth_handle(
200 &username_attr.to_string(),
201 &realm_attr.to_string(),
202 self.src_addr,
203 ) {
204 Ok(key) => key,
205 Err(_) => {
206 build_and_send_err(
207 &self.conn,
208 self.src_addr,
209 bad_request_msg,
210 ERR_NO_SUCH_USER.to_owned(),
211 )
212 .await?;
213 return Ok(None);
214 }
215 };
216
217 let mi = MessageIntegrity(our_key);
218 if let Err(err) = mi.check(&mut m.clone()) {
219 build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await?;
220 Ok(None)
221 } else {
222 Ok(Some(mi))
223 }
224 }
225
226 async fn respond_with_nonce(
227 &mut self,
228 m: &Message,
229 calling_method: Method,
230 response_code: ErrorCode,
231 ) -> Result<(), Error> {
232 let nonce = build_nonce()?;
233
234 {
235 let mut nonces = self.nonces.lock().await;
237 if nonces.contains_key(&nonce) {
238 return Err(ERR_DUPLICATED_NONCE.to_owned());
239 }
240 nonces.insert(nonce.clone(), Instant::now());
241 }
242
243 let msg = build_msg(
244 m.transaction_id,
245 MessageType::new(calling_method, CLASS_ERROR_RESPONSE),
246 vec![
247 Box::new(ErrorCodeAttribute {
248 code: response_code,
249 reason: vec![],
250 }),
251 Box::new(Nonce::new(ATTR_NONCE, nonce)),
252 Box::new(Realm::new(ATTR_REALM, self.realm.clone())),
253 ],
254 )?;
255
256 build_and_send(&self.conn, self.src_addr, msg).await
257 }
258
259 pub(crate) async fn handle_binding_request(&mut self, m: &Message) -> Result<(), Error> {
260 log::debug!("received BindingRequest from {}", self.src_addr);
261
262 let (ip, port) = (self.src_addr.ip(), self.src_addr.port());
263
264 let msg = build_msg(
265 m.transaction_id,
266 BINDING_SUCCESS,
267 vec![
268 Box::new(XORMappedAddress { ip, port }),
269 Box::new(FINGERPRINT),
270 ],
271 )?;
272
273 build_and_send(&self.conn, self.src_addr, msg).await
274 }
275
276 pub(crate) async fn handle_allocate_request(&mut self, m: &Message) -> Result<(), Error> {
278 log::debug!("received AllocateRequest from {}", self.src_addr);
279
280 let message_integrity =
286 if let Some(mi) = self.authenticate_request(m, METHOD_ALLOCATE).await? {
287 mi
288 } else {
289 log::debug!("no MessageIntegrity");
290 return Ok(());
291 };
292
293 let five_tuple = FiveTuple {
294 src_addr: self.src_addr,
295 dst_addr: self.conn.local_addr().await?,
296 protocol: PROTO_UDP,
297 };
298 let mut requested_port = 0;
299 let mut reservation_token = "".to_owned();
300
301 if self
305 .allocation_manager
306 .get_allocation(&five_tuple)
307 .await
308 .is_some()
309 {
310 let msg = build_msg(
311 m.transaction_id,
312 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
313 vec![Box::new(ErrorCodeAttribute {
314 code: CODE_ALLOC_MISMATCH,
315 reason: vec![],
316 })],
317 )?;
318 return build_and_send_err(
319 &self.conn,
320 self.src_addr,
321 msg,
322 ERR_RELAY_ALREADY_ALLOCATED_FOR_FIVE_TUPLE.to_owned(),
323 )
324 .await;
325 }
326
327 let mut requested_transport = RequestedTransport::default();
334 if let Err(err) = requested_transport.get_from(m) {
335 let bad_request_msg = build_msg(
336 m.transaction_id,
337 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
338 vec![Box::new(ErrorCodeAttribute {
339 code: CODE_BAD_REQUEST,
340 reason: vec![],
341 })],
342 )?;
343 return build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await;
344 } else if requested_transport.protocol != PROTO_UDP {
345 let msg = build_msg(
346 m.transaction_id,
347 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
348 vec![Box::new(ErrorCodeAttribute {
349 code: CODE_UNSUPPORTED_TRANS_PROTO,
350 reason: vec![],
351 })],
352 )?;
353 return build_and_send_err(
354 &self.conn,
355 self.src_addr,
356 msg,
357 ERR_REQUESTED_TRANSPORT_MUST_BE_UDP.to_owned(),
358 )
359 .await;
360 }
361
362 if m.contains(ATTR_DONT_FRAGMENT) {
368 let msg = build_msg(
369 m.transaction_id,
370 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
371 vec![
372 Box::new(ErrorCodeAttribute {
373 code: CODE_UNKNOWN_ATTRIBUTE,
374 reason: vec![],
375 }),
376 Box::new(UnknownAttributes(vec![ATTR_DONT_FRAGMENT])),
377 ],
378 )?;
379 return build_and_send_err(
380 &self.conn,
381 self.src_addr,
382 msg,
383 ERR_NO_DONT_FRAGMENT_SUPPORT.to_owned(),
384 )
385 .await;
386 }
387
388 let mut reservation_token_attr = ReservationToken::default();
397 if reservation_token_attr.get_from(m).is_ok() {
398 let mut even_port = EvenPort::default();
399 if even_port.get_from(m).is_ok() {
400 let bad_request_msg = build_msg(
401 m.transaction_id,
402 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
403 vec![Box::new(ErrorCodeAttribute {
404 code: CODE_BAD_REQUEST,
405 reason: vec![],
406 })],
407 )?;
408 return build_and_send_err(
409 &self.conn,
410 self.src_addr,
411 bad_request_msg,
412 ERR_REQUEST_WITH_RESERVATION_TOKEN_AND_EVEN_PORT.to_owned(),
413 )
414 .await;
415 }
416 }
417
418 let mut even_port = EvenPort::default();
425 if even_port.get_from(m).is_ok() {
426 let mut random_port = 1;
427
428 while random_port % 2 != 0 {
429 random_port = match self.allocation_manager.get_random_even_port().await {
430 Ok(port) => port,
431 Err(err) => {
432 let insufficent_capacity_msg = build_msg(
433 m.transaction_id,
434 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
435 vec![Box::new(ErrorCodeAttribute {
436 code: CODE_INSUFFICIENT_CAPACITY,
437 reason: vec![],
438 })],
439 )?;
440 return build_and_send_err(
441 &self.conn,
442 self.src_addr,
443 insufficent_capacity_msg,
444 err,
445 )
446 .await;
447 }
448 };
449 }
450
451 requested_port = random_port;
452 reservation_token = rand_seq(8);
453 }
454
455 let lifetime_duration = allocation_lifetime(m);
467 let a = match self
468 .allocation_manager
469 .create_allocation(
470 five_tuple,
471 Arc::clone(&self.conn),
472 requested_port,
473 lifetime_duration,
474 )
475 .await
476 {
477 Ok(a) => a,
478 Err(err) => {
479 let insufficent_capacity_msg = build_msg(
480 m.transaction_id,
481 MessageType::new(METHOD_ALLOCATE, CLASS_ERROR_RESPONSE),
482 vec![Box::new(ErrorCodeAttribute {
483 code: CODE_INSUFFICIENT_CAPACITY,
484 reason: vec![],
485 })],
486 )?;
487 return build_and_send_err(
488 &self.conn,
489 self.src_addr,
490 insufficent_capacity_msg,
491 err,
492 )
493 .await;
494 }
495 };
496
497 let (src_ip, src_port) = (self.src_addr.ip(), self.src_addr.port());
509 let (relay_ip, relay_port) = {
510 let a = a.lock().await;
511 (a.relay_addr.ip(), a.relay_addr.port())
512 };
513
514 let msg = {
515 if !reservation_token.is_empty() {
516 self.allocation_manager
517 .create_reservation(reservation_token.clone(), relay_port)
518 .await;
519 }
520
521 let mut response_attrs: Vec<Box<dyn Setter>> = vec![
522 Box::new(RelayedAddress {
523 ip: relay_ip,
524 port: relay_port,
525 }),
526 Box::new(Lifetime(lifetime_duration)),
527 Box::new(XORMappedAddress {
528 ip: src_ip,
529 port: src_port,
530 }),
531 ];
532
533 if !reservation_token.is_empty() {
534 response_attrs.push(Box::new(ReservationToken(
535 reservation_token.as_bytes().to_vec(),
536 )));
537 }
538
539 response_attrs.push(Box::new(message_integrity));
540 build_msg(
541 m.transaction_id,
542 MessageType::new(METHOD_ALLOCATE, CLASS_SUCCESS_RESPONSE),
543 response_attrs,
544 )?
545 };
546
547 build_and_send(&self.conn, self.src_addr, msg).await
548 }
549
550 pub(crate) async fn handle_refresh_request(&mut self, m: &Message) -> Result<(), Error> {
551 log::debug!("received RefreshRequest from {}", self.src_addr);
552
553 let message_integrity =
554 if let Some(mi) = self.authenticate_request(m, METHOD_REFRESH).await? {
555 mi
556 } else {
557 log::debug!("no MessageIntegrity");
558 return Ok(());
559 };
560
561 let lifetime_duration = allocation_lifetime(m);
562 let five_tuple = FiveTuple {
563 src_addr: self.src_addr,
564 dst_addr: self.conn.local_addr().await?,
565 protocol: PROTO_UDP,
566 };
567
568 if lifetime_duration != Duration::from_secs(0) {
569 let a = self.allocation_manager.get_allocation(&five_tuple).await;
570 if let Some(a) = a {
571 let a = a.lock().await;
572 a.refresh(lifetime_duration).await;
573 } else {
574 return Err(ERR_NO_ALLOCATION_FOUND.to_owned());
575 }
576 } else {
577 self.allocation_manager.delete_allocation(&five_tuple).await;
578 }
579
580 let msg = build_msg(
581 m.transaction_id,
582 MessageType::new(METHOD_REFRESH, CLASS_SUCCESS_RESPONSE),
583 vec![
584 Box::new(Lifetime(lifetime_duration)),
585 Box::new(message_integrity),
586 ],
587 )?;
588
589 build_and_send(&self.conn, self.src_addr, msg).await
590 }
591
592 pub(crate) async fn handle_create_permission_request(
593 &mut self,
594 m: &Message,
595 ) -> Result<(), Error> {
596 log::debug!("received CreatePermission from {}", self.src_addr);
597
598 let a = self
599 .allocation_manager
600 .get_allocation(&FiveTuple {
601 src_addr: self.src_addr,
602 dst_addr: self.conn.local_addr().await?,
603 protocol: PROTO_UDP,
604 })
605 .await;
606
607 if let Some(a) = a {
608 let message_integrity = if let Some(mi) = self
609 .authenticate_request(m, METHOD_CREATE_PERMISSION)
610 .await?
611 {
612 mi
613 } else {
614 log::debug!("no MessageIntegrity");
615 return Ok(());
616 };
617 let mut add_count = 0;
618
619 {
620 let a = a.lock().await;
621 for attr in &m.attributes.0 {
622 if attr.typ != ATTR_XOR_PEER_ADDRESS {
623 continue;
624 }
625
626 let mut peer_address = PeerAddress::default();
627 if peer_address.get_from(m).is_err() {
628 add_count = 0;
629 break;
630 }
631
632 log::debug!(
633 "adding permission for {}",
634 format!("{}:{}", peer_address.ip, peer_address.port)
635 );
636
637 a.add_permission(Permission::new(SocketAddr::new(
638 peer_address.ip,
639 peer_address.port,
640 )))
641 .await;
642 add_count += 1;
643 }
644 }
645
646 let mut resp_class = CLASS_SUCCESS_RESPONSE;
647 if add_count == 0 {
648 resp_class = CLASS_ERROR_RESPONSE;
649 }
650
651 let msg = build_msg(
652 m.transaction_id,
653 MessageType::new(METHOD_CREATE_PERMISSION, resp_class),
654 vec![Box::new(message_integrity)],
655 )?;
656
657 build_and_send(&self.conn, self.src_addr, msg).await
658 } else {
659 Err(ERR_NO_ALLOCATION_FOUND.to_owned())
660 }
661 }
662
663 pub(crate) async fn handle_send_indication(&mut self, m: &Message) -> Result<(), Error> {
664 log::debug!("received SendIndication from {}", self.src_addr);
665
666 let a = self
667 .allocation_manager
668 .get_allocation(&FiveTuple {
669 src_addr: self.src_addr,
670 dst_addr: self.conn.local_addr().await?,
671 protocol: PROTO_UDP,
672 })
673 .await;
674
675 if let Some(a) = a {
676 let mut data_attr = Data::default();
677 data_attr.get_from(m)?;
678
679 let mut peer_address = PeerAddress::default();
680 peer_address.get_from(m)?;
681
682 let msg_dst = SocketAddr::new(peer_address.ip, peer_address.port);
683
684 let has_perm = {
685 let a = a.lock().await;
686 a.has_permission(&msg_dst).await
687 };
688 if !has_perm {
689 return Err(ERR_NO_PERMISSION.to_owned());
690 }
691
692 let a = a.lock().await;
693 let l = a.relay_socket.send_to(&data_attr.0, msg_dst).await?;
694 if l != data_attr.0.len() {
695 Err(ERR_SHORT_WRITE.to_owned())
696 } else {
697 Ok(())
698 }
699 } else {
700 Err(ERR_NO_ALLOCATION_FOUND.to_owned())
701 }
702 }
703
704 pub(crate) async fn handle_channel_bind_request(&mut self, m: &Message) -> Result<(), Error> {
705 log::debug!("received ChannelBindRequest from {}", self.src_addr);
706
707 let a = self
708 .allocation_manager
709 .get_allocation(&FiveTuple {
710 src_addr: self.src_addr,
711 dst_addr: self.conn.local_addr().await?,
712 protocol: PROTO_UDP,
713 })
714 .await;
715
716 if let Some(a) = a {
717 let bad_request_msg = build_msg(
718 m.transaction_id,
719 MessageType::new(METHOD_CHANNEL_BIND, CLASS_ERROR_RESPONSE),
720 vec![Box::new(ErrorCodeAttribute {
721 code: CODE_BAD_REQUEST,
722 reason: vec![],
723 })],
724 )?;
725
726 let message_integrity =
727 if let Some(mi) = self.authenticate_request(m, METHOD_CHANNEL_BIND).await? {
728 mi
729 } else {
730 log::debug!("no MessageIntegrity");
731 return Ok(());
732 };
733 let mut channel = ChannelNumber::default();
734 if let Err(err) = channel.get_from(m) {
735 return build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await;
736 }
737
738 let mut peer_addr = PeerAddress::default();
739 if let Err(err) = peer_addr.get_from(m) {
740 return build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await;
741 }
742
743 log::debug!(
744 "binding channel {} to {}",
745 channel,
746 format!("{}:{}", peer_addr.ip, peer_addr.port)
747 );
748
749 let result = {
750 let a = a.lock().await;
751 a.add_channel_bind(
752 ChannelBind::new(channel, SocketAddr::new(peer_addr.ip, peer_addr.port)),
753 self.channel_bind_timeout,
754 )
755 .await
756 };
757 if let Err(err) = result {
758 return build_and_send_err(&self.conn, self.src_addr, bad_request_msg, err).await;
759 }
760
761 let msg = build_msg(
762 m.transaction_id,
763 MessageType::new(METHOD_CHANNEL_BIND, CLASS_SUCCESS_RESPONSE),
764 vec![Box::new(message_integrity)],
765 )?;
766 return build_and_send(&self.conn, self.src_addr, msg).await;
767 } else {
768 Err(ERR_NO_ALLOCATION_FOUND.to_owned())
769 }
770 }
771
772 pub(crate) async fn handle_channel_data(&mut self, c: &ChannelData) -> Result<(), Error> {
773 log::debug!("received ChannelData from {}", self.src_addr);
774
775 let a = self
776 .allocation_manager
777 .get_allocation(&FiveTuple {
778 src_addr: self.src_addr,
779 dst_addr: self.conn.local_addr().await?,
780 protocol: PROTO_UDP,
781 })
782 .await;
783
784 if let Some(a) = a {
785 let a = a.lock().await;
786 let channel = a.get_channel_addr(&c.number).await;
787 if let Some(peer) = channel {
788 let l = a.relay_socket.send_to(&c.data, peer).await?;
789 if l != c.data.len() {
790 Err(ERR_SHORT_WRITE.to_owned())
791 } else {
792 Ok(())
793 }
794 } else {
795 Err(ERR_NO_SUCH_CHANNEL_BIND.to_owned())
796 }
797 } else {
798 Err(ERR_NO_ALLOCATION_FOUND.to_owned())
799 }
800 }
801}
802
803pub(crate) fn rand_seq(n: usize) -> String {
804 let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".as_bytes();
805 let mut buf = vec![0u8; n];
806 for b in &mut buf {
807 *b = letters[rand::random::<usize>() % letters.len()];
808 }
809 if let Ok(s) = String::from_utf8(buf) {
810 s
811 } else {
812 String::new()
813 }
814}
815
816pub(crate) fn build_nonce() -> Result<String, Error> {
817 let mut s = String::new();
819 s.push_str(
820 format!(
821 "{}",
822 SystemTime::now()
823 .duration_since(SystemTime::UNIX_EPOCH)?
824 .as_nanos()
825 )
826 .as_str(),
827 );
828 s.push_str(format!("{}", rand::random::<u64>()).as_str());
829
830 let mut h = Md5::new();
831 h.update(s.as_bytes());
832 Ok(format!("{:x}", h.finalize()))
833}
834
835pub(crate) async fn build_and_send(
836 conn: &Arc<dyn Conn + Send + Sync>,
837 dst: SocketAddr,
838 msg: Message,
839) -> Result<(), Error> {
840 let _ = conn.send_to(&msg.raw, dst).await?;
841 Ok(())
842}
843
844pub(crate) async fn build_and_send_err(
846 conn: &Arc<dyn Conn + Send + Sync>,
847 dst: SocketAddr,
848 msg: Message,
849 err: Error,
850) -> Result<(), Error> {
851 if let Err(send_err) = build_and_send(conn, dst, msg).await {
852 Err(send_err)
853 } else {
854 Err(err)
855 }
856}
857
858pub(crate) fn build_msg(
859 transaction_id: TransactionId,
860 msg_type: MessageType,
861 mut additional: Vec<Box<dyn Setter>>,
862) -> Result<Message, Error> {
863 let mut attrs: Vec<Box<dyn Setter>> = vec![
864 Box::new(Message {
865 transaction_id,
866 ..Default::default()
867 }),
868 Box::new(msg_type),
869 ];
870
871 attrs.append(&mut additional);
872
873 let mut msg = Message::new();
874 msg.build(&attrs)?;
875 Ok(msg)
876}
877
878pub(crate) fn allocation_lifetime(m: &Message) -> Duration {
879 let mut lifetime_duration = DEFAULT_LIFETIME;
880
881 let mut lifetime = Lifetime::default();
882 if lifetime.get_from(m).is_ok() && lifetime.0 < MAXIMUM_ALLOCATION_LIFETIME {
883 lifetime_duration = lifetime.0;
884 }
885
886 lifetime_duration
887}