1use std::{
9 error::Error as StdError,
10 fmt::{Debug, Display},
11 io::{Cursor, Read, Result, Write},
12 num::{ParseFloatError, ParseIntError},
13 sync::atomic::{AtomicU64, Ordering},
14};
15
16use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
17use chrono::Duration;
18use tracing::error;
19
20use crate::{constants, status_code::StatusCode, Context, QualifiedName};
21
22#[derive(Debug, Clone, Default)]
23pub enum DataEncoding {
25 #[default]
26 Binary,
28 XML,
30 JSON,
32 Other(QualifiedName),
34}
35#[derive(Debug, Clone, Default)]
36pub enum BuiltInDataEncoding {
38 #[default]
39 Binary,
41 XML,
43 JSON,
45}
46
47impl DataEncoding {
48 pub fn from_browse_name(name: QualifiedName) -> std::result::Result<Self, StatusCode> {
50 match name.name.as_ref() {
51 "Default Binary" | "" => Ok(Self::Binary),
52 "Default XML" => Ok(Self::XML),
53 "Default JSON" => Ok(Self::JSON),
54 _ if name.namespace_index != 0 => Ok(Self::Other(name)),
55 _ => Err(StatusCode::BadDataEncodingInvalid),
56 }
57 }
58}
59
60pub type EncodingResult<T> = std::result::Result<T, Error>;
62
63#[derive(Debug)]
64pub struct Error {
69 status: StatusCode,
70 request_id: Option<u32>,
71 request_handle: Option<u32>,
72 context: Box<dyn StdError + Send + Sync>,
73}
74
75impl Display for Error {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "{}: {}", self.status(), self.context)
78 }
79}
80
81impl StdError for Error {
82 fn source(&self) -> Option<&(dyn StdError + 'static)> {
83 Some(&*self.context)
84 }
85}
86
87impl Error {
88 pub fn new(status: StatusCode, context: impl Into<Box<dyn StdError + Send + Sync>>) -> Self {
91 Self {
92 status,
93 request_handle: None,
94 request_id: None,
95 context: context.into(),
96 }
97 }
98
99 pub fn decoding(context: impl Into<Box<dyn StdError + Send + Sync>>) -> Self {
102 Self {
103 status: StatusCode::BadDecodingError,
104 request_handle: None,
105 request_id: None,
106 context: context.into(),
107 }
108 }
109
110 pub fn encoding(context: impl Into<Box<dyn StdError + Send + Sync>>) -> Self {
113 Self {
114 status: StatusCode::BadEncodingError,
115 request_handle: None,
116 request_id: None,
117 context: context.into(),
118 }
119 }
120
121 pub fn with_context(mut self, request_id: Option<u32>, request_handle: Option<u32>) -> Self {
123 self.request_id = request_id;
124 self.request_handle = request_handle;
125 self
126 }
127
128 pub fn with_request_id(mut self, id: u32) -> Self {
130 self.request_id = Some(id);
131 self
132 }
133
134 pub fn with_request_handle(mut self, handle: u32) -> Self {
136 self.request_handle = Some(handle);
137 self
138 }
139
140 pub fn maybe_with_request_handle(mut self, handle: Option<u32>) -> Self {
142 if let Some(handle) = handle {
143 self.request_handle = Some(handle);
144 }
145 self
146 }
147
148 pub fn status(&self) -> StatusCode {
150 self.status
151 }
152
153 pub fn full_context(&self) -> Option<(u32, u32)> {
155 if let (Some(id), Some(handle)) = (self.request_id, self.request_handle) {
156 Some((id, handle))
157 } else {
158 None
159 }
160 }
161}
162
163impl From<Error> for StatusCode {
164 fn from(value: Error) -> Self {
165 error!("{}", value);
166 value.status()
167 }
168}
169
170impl From<Error> for std::io::Error {
171 fn from(value: Error) -> Self {
172 value.status().into()
173 }
174}
175
176impl From<std::io::Error> for Error {
177 fn from(value: std::io::Error) -> Self {
178 Self::decoding(value)
179 }
180}
181
182impl From<ParseIntError> for Error {
183 fn from(value: ParseIntError) -> Self {
184 Self::decoding(value)
185 }
186}
187
188impl From<ParseFloatError> for Error {
189 fn from(value: ParseFloatError) -> Self {
190 Self::decoding(value)
191 }
192}
193
194#[derive(Debug)]
197pub struct DepthLock<'a> {
198 depth_gauge: &'a DepthGauge,
199}
200
201impl Drop for DepthLock<'_> {
202 fn drop(&mut self) {
203 self.depth_gauge
206 .current_depth
207 .fetch_sub(1, Ordering::Release);
208 }
209}
210
211impl<'a> DepthLock<'a> {
212 fn new(depth_gauge: &'a DepthGauge) -> (Self, u64) {
213 let current = depth_gauge.current_depth.fetch_add(1, Ordering::Acquire);
214
215 (Self { depth_gauge }, current)
216 }
217
218 pub fn obtain(depth_gauge: &'a DepthGauge) -> core::result::Result<DepthLock<'a>, Error> {
221 let max_depth = depth_gauge.max_depth;
222 let (gauge, val) = Self::new(depth_gauge);
223
224 if val >= max_depth {
225 Err(Error::decoding(
226 "Decoding in stream aborted due maximum recursion depth being reached",
227 ))
228 } else {
229 Ok(gauge)
230 }
231 }
232}
233
234#[derive(Debug)]
237pub struct DepthGauge {
238 pub(self) max_depth: u64,
240 pub(self) current_depth: AtomicU64,
242}
243
244impl Clone for DepthGauge {
247 fn clone(&self) -> Self {
248 Self {
249 max_depth: self.max_depth,
250 current_depth: AtomicU64::new(0),
251 }
252 }
253}
254
255impl Default for DepthGauge {
256 fn default() -> Self {
257 Self::new(constants::MAX_DECODING_DEPTH)
258 }
259}
260
261impl DepthGauge {
262 pub fn new(max_depth: u64) -> Self {
264 Self {
265 max_depth,
266 current_depth: AtomicU64::new(0),
267 }
268 }
269
270 pub fn minimal() -> Self {
272 Self {
273 max_depth: 1,
274 ..Default::default()
275 }
276 }
277
278 pub fn max_depth(&self) -> u64 {
280 self.max_depth
281 }
282}
283
284#[derive(Clone, Debug)]
285pub struct DecodingOptions {
287 pub client_offset: Duration,
290 pub max_message_size: usize,
292 pub max_chunk_count: usize,
294 pub max_string_length: usize,
296 pub max_byte_string_length: usize,
298 pub max_array_length: usize,
300 pub decoding_depth_gauge: DepthGauge,
302}
303
304impl Default for DecodingOptions {
305 fn default() -> Self {
306 DecodingOptions {
307 client_offset: Duration::zero(),
308 max_message_size: constants::MAX_MESSAGE_SIZE,
309 max_chunk_count: constants::MAX_CHUNK_COUNT,
310 max_string_length: constants::MAX_STRING_LENGTH,
311 max_byte_string_length: constants::MAX_BYTE_STRING_LENGTH,
312 max_array_length: constants::MAX_ARRAY_LENGTH,
313 decoding_depth_gauge: DepthGauge::default(),
314 }
315 }
316}
317
318impl DecodingOptions {
319 pub fn minimal() -> Self {
322 DecodingOptions {
323 max_string_length: 8192,
324 max_byte_string_length: 8192,
325 max_array_length: 8192,
326 decoding_depth_gauge: DepthGauge::minimal(),
327 ..Default::default()
328 }
329 }
330
331 pub fn test() -> Self {
333 Self::default()
334 }
335
336 pub fn depth_lock(&self) -> core::result::Result<DepthLock<'_>, Error> {
339 DepthLock::obtain(&self.decoding_depth_gauge)
340 }
341}
342
343pub trait UaNullable {
346 fn is_ua_null(&self) -> bool {
349 false
350 }
351}
352
353impl<T> UaNullable for Option<T>
354where
355 T: UaNullable,
356{
357 fn is_ua_null(&self) -> bool {
358 match self {
359 Some(s) => s.is_ua_null(),
360 None => true,
361 }
362 }
363}
364
365impl<T> UaNullable for Vec<T> where T: UaNullable {}
366impl<T> UaNullable for Box<T>
367where
368 T: UaNullable,
369{
370 fn is_ua_null(&self) -> bool {
371 self.as_ref().is_ua_null()
372 }
373}
374
375macro_rules! is_null_const {
376 ($t:ty, $c:expr) => {
377 impl UaNullable for $t {
378 fn is_ua_null(&self) -> bool {
379 *self == $c
380 }
381 }
382 };
383}
384
385is_null_const!(bool, false);
386is_null_const!(u8, 0);
387is_null_const!(u16, 0);
388is_null_const!(u32, 0);
389is_null_const!(u64, 0);
390is_null_const!(i8, 0);
391is_null_const!(i16, 0);
392is_null_const!(i32, 0);
393is_null_const!(i64, 0);
394is_null_const!(f32, 0.0);
395is_null_const!(f64, 0.0);
396
397impl UaNullable for String {}
398impl UaNullable for str {}
399
400pub trait BinaryEncodable {
416 #[allow(unused)]
419 fn byte_len(&self, ctx: &crate::Context<'_>) -> usize;
420 fn encode<S: Write + ?Sized>(&self, stream: &mut S, ctx: &Context<'_>) -> EncodingResult<()>;
422
423 fn override_encoding(&self) -> Option<BuiltInDataEncoding> {
427 None
428 }
429
430 fn encode_to_vec(&self, ctx: &Context<'_>) -> Vec<u8> {
433 let mut buffer = Cursor::new(Vec::with_capacity(self.byte_len(ctx)));
434 let _ = self.encode(&mut buffer, ctx);
435 buffer.into_inner()
436 }
437}
438
439pub trait BinaryDecodable: Sized {
441 fn decode<S: Read + ?Sized>(stream: &mut S, ctx: &Context<'_>) -> EncodingResult<Self>;
445}
446
447pub trait SimpleBinaryEncodable {
450 #[allow(unused)]
451 fn byte_len(&self) -> usize;
454
455 fn encode<S: Write + ?Sized>(&self, stream: &mut S) -> EncodingResult<()>;
457
458 fn encode_to_vec(&self) -> Vec<u8> {
461 let mut buffer = Cursor::new(Vec::with_capacity(self.byte_len()));
462 let _ = self.encode(&mut buffer);
463 buffer.into_inner()
464 }
465}
466
467impl<T> BinaryEncodable for T
468where
469 T: SimpleBinaryEncodable,
470{
471 fn byte_len(&self, _ctx: &crate::Context<'_>) -> usize {
472 SimpleBinaryEncodable::byte_len(self)
473 }
474
475 fn encode<S: Write + ?Sized>(&self, stream: &mut S, _ctx: &Context<'_>) -> EncodingResult<()> {
476 SimpleBinaryEncodable::encode(self, stream)
477 }
478}
479
480pub trait SimpleBinaryDecodable: Sized {
483 fn decode<S: Read + ?Sized>(
485 stream: &mut S,
486 decoding_options: &DecodingOptions,
487 ) -> EncodingResult<Self>;
488}
489
490impl<T> BinaryDecodable for T
491where
492 T: SimpleBinaryDecodable,
493{
494 fn decode<S: Read + ?Sized>(stream: &mut S, ctx: &Context<'_>) -> EncodingResult<Self> {
495 SimpleBinaryDecodable::decode(stream, ctx.options())
496 }
497}
498
499pub fn process_encode_io_result(result: Result<()>) -> EncodingResult<()> {
501 result.map_err(Error::encoding)
502}
503
504pub fn process_decode_io_result<T>(result: Result<T>) -> EncodingResult<T>
506where
507 T: Debug,
508{
509 result.map_err(Error::decoding)
510}
511
512impl<T> BinaryEncodable for Option<Vec<T>>
513where
514 T: BinaryEncodable,
515{
516 fn byte_len(&self, ctx: &crate::Context<'_>) -> usize {
517 let mut size = 4;
518 if let Some(ref values) = self {
519 size += values.iter().map(|v| v.byte_len(ctx)).sum::<usize>();
520 }
521 size
522 }
523
524 fn encode<S: Write + ?Sized>(&self, stream: &mut S, ctx: &Context<'_>) -> EncodingResult<()> {
525 if let Some(ref values) = self {
526 write_i32(stream, values.len() as i32)?;
527 for value in values.iter() {
528 value.encode(stream, ctx)?;
529 }
530 } else {
531 write_i32(stream, -1)?;
532 }
533 Ok(())
534 }
535}
536
537impl<T> BinaryDecodable for Option<Vec<T>>
538where
539 T: BinaryDecodable,
540{
541 fn decode<S: Read + ?Sized>(
542 stream: &mut S,
543 ctx: &Context<'_>,
544 ) -> EncodingResult<Option<Vec<T>>> {
545 let len = read_i32(stream)?;
546 if len == -1 {
547 Ok(None)
548 } else if len < -1 {
549 Err(Error::decoding(
550 "Array length is negative value and invalid",
551 ))
552 } else if len as usize > ctx.options().max_array_length {
553 Err(Error::decoding(format!(
554 "Array length {} exceeds decoding limit {}",
555 len,
556 ctx.options().max_array_length
557 )))
558 } else {
559 let mut values: Vec<T> = Vec::with_capacity(len as usize);
560 for _ in 0..len {
561 values.push(T::decode(stream, ctx)?);
562 }
563 Ok(Some(values))
564 }
565 }
566}
567
568pub fn byte_len_array<T: BinaryEncodable>(values: &Option<Vec<T>>, ctx: &Context<'_>) -> usize {
570 let mut size = 4;
571 if let Some(ref values) = values {
572 size += values.iter().map(|v| v.byte_len(ctx)).sum::<usize>();
573 }
574 size
575}
576
577pub fn write_bytes<W: Write + ?Sized>(
579 stream: &mut W,
580 value: u8,
581 count: usize,
582) -> EncodingResult<usize> {
583 for _ in 0..count {
584 stream.write_u8(value).map_err(Error::encoding)?;
585 }
586 Ok(count)
587}
588
589pub fn write_u8<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
591where
592 T: Into<u8>,
593{
594 let buf: [u8; 1] = [value.into()];
595 process_encode_io_result(stream.write_all(&buf))
596}
597
598pub fn write_i16<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
600where
601 T: Into<i16>,
602{
603 let mut buf = [0u8; 2];
604 LittleEndian::write_i16(&mut buf, value.into());
605 process_encode_io_result(stream.write_all(&buf))
606}
607
608pub fn write_u16<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
610where
611 T: Into<u16>,
612{
613 let mut buf = [0u8; 2];
614 LittleEndian::write_u16(&mut buf, value.into());
615 process_encode_io_result(stream.write_all(&buf))
616}
617
618pub fn write_i32<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
620where
621 T: Into<i32>,
622{
623 let mut buf = [0u8; 4];
624 LittleEndian::write_i32(&mut buf, value.into());
625 process_encode_io_result(stream.write_all(&buf))
626}
627
628pub fn write_u32<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
630where
631 T: Into<u32>,
632{
633 let mut buf = [0u8; 4];
634 LittleEndian::write_u32(&mut buf, value.into());
635 process_encode_io_result(stream.write_all(&buf))
636}
637
638pub fn write_i64<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
640where
641 T: Into<i64>,
642{
643 let mut buf = [0u8; 8];
644 LittleEndian::write_i64(&mut buf, value.into());
645 process_encode_io_result(stream.write_all(&buf))
646}
647
648pub fn write_u64<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
650where
651 T: Into<u64>,
652{
653 let mut buf = [0u8; 8];
654 LittleEndian::write_u64(&mut buf, value.into());
655 process_encode_io_result(stream.write_all(&buf))
656}
657
658pub fn write_f32<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
660where
661 T: Into<f32>,
662{
663 let mut buf = [0u8; 4];
664 LittleEndian::write_f32(&mut buf, value.into());
665 process_encode_io_result(stream.write_all(&buf))
666}
667
668pub fn write_f64<T, W: Write + ?Sized>(stream: &mut W, value: T) -> EncodingResult<()>
670where
671 T: Into<f64>,
672{
673 let mut buf = [0u8; 8];
674 LittleEndian::write_f64(&mut buf, value.into());
675 process_encode_io_result(stream.write_all(&buf))
676}
677
678pub fn read_bytes<R: Read + ?Sized>(stream: &mut R, buf: &mut [u8]) -> EncodingResult<usize> {
680 let result = stream.read_exact(buf);
681 process_decode_io_result(result)?;
682 Ok(buf.len())
683}
684
685pub fn read_u8<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<u8> {
687 let mut buf = [0u8];
688 let result = stream.read_exact(&mut buf);
689 process_decode_io_result(result)?;
690 Ok(buf[0])
691}
692
693pub fn read_i16<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<i16> {
695 let mut buf = [0u8; 2];
696 let result = stream.read_exact(&mut buf);
697 process_decode_io_result(result)?;
698 Ok(LittleEndian::read_i16(&buf))
699}
700
701pub fn read_u16<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<u16> {
703 let mut buf = [0u8; 2];
704 let result = stream.read_exact(&mut buf);
705 process_decode_io_result(result)?;
706 Ok(LittleEndian::read_u16(&buf))
707}
708
709pub fn read_i32<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<i32> {
711 let mut buf = [0u8; 4];
712 let result = stream.read_exact(&mut buf);
713 process_decode_io_result(result)?;
714 Ok(LittleEndian::read_i32(&buf))
715}
716
717pub fn read_u32<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<u32> {
719 let mut buf = [0u8; 4];
720 let result = stream.read_exact(&mut buf);
721 process_decode_io_result(result)?;
722 Ok(LittleEndian::read_u32(&buf))
723}
724
725pub fn read_i64<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<i64> {
727 let mut buf = [0u8; 8];
728 let result = stream.read_exact(&mut buf);
729 process_decode_io_result(result)?;
730 Ok(LittleEndian::read_i64(&buf))
731}
732
733pub fn read_u64<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<u64> {
735 let mut buf = [0u8; 8];
736 let result = stream.read_exact(&mut buf);
737 process_decode_io_result(result)?;
738 Ok(LittleEndian::read_u64(&buf))
739}
740
741pub fn read_f32<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<f32> {
743 let mut buf = [0u8; 4];
744 let result = stream.read_exact(&mut buf);
745 process_decode_io_result(result)?;
746 Ok(LittleEndian::read_f32(&buf))
747}
748
749pub fn read_f64<R: Read + ?Sized>(stream: &mut R) -> EncodingResult<f64> {
751 let mut buf = [0u8; 8];
752 let result = stream.read_exact(&mut buf);
753 process_decode_io_result(result)?;
754 Ok(LittleEndian::read_f64(&buf))
755}
756
757pub fn skip_bytes<R: Read + ?Sized>(stream: &mut R, bytes: u64) -> EncodingResult<()> {
759 std::io::copy(&mut stream.take(bytes), &mut std::io::sink())?;
760 Ok(())
761}
762
763#[macro_export]
764macro_rules! impl_encoded_as {
769 ($ty:ident, $from:expr, $to:expr, $byte_len:expr) => {
770 impl $crate::SimpleBinaryEncodable for $ty {
771 fn byte_len(&self) -> usize {
772 $byte_len(self)
773 }
774
775 fn encode<S: std::io::Write + ?Sized>(
776 &self,
777 stream: &mut S,
778 ) -> $crate::EncodingResult<()> {
779 $to(self)?.encode(stream)
780 }
781 }
782
783 impl $crate::SimpleBinaryDecodable for $ty {
784 fn decode<S: std::io::Read + ?Sized>(
785 stream: &mut S,
786 decoding_options: &$crate::DecodingOptions,
787 ) -> $crate::EncodingResult<Self> {
788 let inner = $crate::SimpleBinaryDecodable::decode(stream, decoding_options)?;
789 $from(inner)
790 }
791 }
792
793 #[cfg(feature = "json")]
794 impl $crate::json::JsonEncodable for $ty {
795 fn encode(
796 &self,
797 stream: &mut $crate::json::JsonStreamWriter<&mut dyn std::io::Write>,
798 ctx: &$crate::json::Context<'_>,
799 ) -> $crate::EncodingResult<()> {
800 $to(self)?.encode(stream, ctx)
801 }
802 }
803
804 #[cfg(feature = "json")]
805 impl $crate::json::JsonDecodable for $ty {
806 fn decode(
807 stream: &mut $crate::json::JsonStreamReader<&mut dyn std::io::Read>,
808 ctx: &$crate::json::Context<'_>,
809 ) -> $crate::EncodingResult<Self> {
810 let inner = $crate::json::JsonDecodable::decode(stream, ctx)?;
811 $from(inner)
812 }
813 }
814
815 #[cfg(feature = "xml")]
816 impl $crate::xml::XmlEncodable for $ty {
817 fn encode(
818 &self,
819 stream: &mut $crate::xml::XmlStreamWriter<&mut dyn std::io::Write>,
820 ctx: &$crate::xml::Context<'_>,
821 ) -> $crate::EncodingResult<()> {
822 $to(self)?.encode(stream, ctx)
823 }
824 }
825 #[cfg(feature = "xml")]
826 impl $crate::xml::XmlDecodable for $ty {
827 fn decode(
828 stream: &mut $crate::xml::XmlStreamReader<&mut dyn std::io::Read>,
829 ctx: &$crate::xml::Context<'_>,
830 ) -> $crate::EncodingResult<Self> {
831 let inner = $crate::xml::XmlDecodable::decode(stream, ctx)?;
832 $from(inner)
833 }
834 }
835 };
836}
837
838pub use impl_encoded_as;
839
840#[cfg(test)]
841mod tests {
842 use std::sync::Arc;
843
844 use super::{constants, DepthGauge, DepthLock};
845 use crate::StatusCode;
846
847 #[test]
848 fn depth_gauge() {
849 let dg = Arc::new(DepthGauge::default());
850
851 let max_depth = dg.max_depth();
852 assert_eq!(max_depth, constants::MAX_DECODING_DEPTH);
853
854 {
856 let mut v = Vec::new();
857 for _ in 0..max_depth {
858 v.push(DepthLock::obtain(&dg).unwrap());
859 }
860
861 {
863 assert_eq!(
864 dg.current_depth.load(std::sync::atomic::Ordering::Relaxed),
865 max_depth
866 );
867 }
868
869 assert_eq!(
871 DepthLock::obtain(&dg).unwrap_err().status,
872 StatusCode::BadDecodingError
873 );
874
875 }
877
878 {
880 assert_eq!(
881 dg.current_depth.load(std::sync::atomic::Ordering::Relaxed),
882 0
883 );
884 }
885 }
886}