1use bytes::{Buf, Bytes, BytesMut};
2use log::{debug, trace, warn};
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio_util::codec::{Decoder, Encoder};
5
6use crate::error::FramingError;
7
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
16pub enum LenientChunkedFraming {
17 #[default]
19 Off,
20 RecoverUndercount,
23 RecoverOvercount,
26 RecoverBoth,
28}
29
30impl LenientChunkedFraming {
31 fn is_enabled(&self) -> bool {
33 *self != Self::Off
34 }
35
36 fn recovers_undercount(&self) -> bool {
38 matches!(self, Self::RecoverUndercount | Self::RecoverBoth)
39 }
40
41 fn recovers_overcount(&self) -> bool {
43 matches!(self, Self::RecoverOvercount | Self::RecoverBoth)
44 }
45}
46
47const EOM_MARKER: &[u8] = b"]]>]]>";
48const EOM_LEN: usize = EOM_MARKER.len();
49
50const CHUNKED_EOM_MARKER: &[u8] = b"\n##\n";
51const CHUNKED_EOM_MARKER_LEN: usize = CHUNKED_EOM_MARKER.len();
52
53const CHUNKED_HEADER_START: &[u8] = b"\n#";
54
55#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum DecodedFrame {
63 Chunk(Bytes),
65 EndOfMessage,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum FramingMode {
77 EndOfMessage,
79 Chunked,
81}
82
83#[derive(Default, Debug, Clone, Copy)]
84pub struct CodecConfig {
85 pub max_message_size: Option<usize>, pub lenient_chunked_framing: LenientChunkedFraming,
87}
88
89pub struct NetconfCodec {
90 framing_mode: FramingMode,
91 config: CodecConfig,
92 eom_complete: bool,
98 message_bytes: usize,
102 closing: bool,
107}
108
109impl NetconfCodec {
123 pub fn new(framing_mode: FramingMode, config: CodecConfig) -> Self {
124 Self {
125 framing_mode,
126 config,
127 eom_complete: false,
128 message_bytes: 0,
129 closing: false,
130 }
131 }
132
133 pub fn set_mode(&mut self, framing_mode: FramingMode) {
134 self.framing_mode = framing_mode;
135 self.eom_complete = false;
136 self.message_bytes = 0;
137 }
138 pub fn framing_mode(&self) -> FramingMode {
139 self.framing_mode
140 }
141 pub fn set_closing(&mut self) {
144 self.closing = true;
145 }
146
147 fn check_size(&self, size: usize) -> Result<(), FramingError> {
148 if let Some(max_size) = self.config.max_message_size
149 && size > max_size
150 {
151 return Err(FramingError::MessageTooLarge {
152 limit: max_size,
153 received: size,
154 });
155 }
156 Ok(())
157 }
158
159 fn decode_eom(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
167 if self.eom_complete {
170 self.eom_complete = false;
171 self.message_bytes = 0;
172 return Ok(Some(DecodedFrame::EndOfMessage));
173 }
174
175 if src.is_empty() {
176 return Ok(None);
177 }
178
179 if let Some(pos) = memchr::memmem::find(src, EOM_MARKER) {
181 if pos > 0 {
182 let data = src.split_to(pos);
184 src.advance(EOM_LEN);
185 self.message_bytes += data.len();
186 self.check_size(self.message_bytes)?;
187 self.eom_complete = true;
188 debug!("eom: final chunk ({} bytes), marker found", data.len());
189 Ok(Some(DecodedFrame::Chunk(data.freeze())))
190 } else {
191 src.advance(EOM_LEN);
193 self.message_bytes = 0;
194 debug!("eom: empty message");
195 Ok(Some(DecodedFrame::EndOfMessage))
196 }
197 } else {
198 let holdback = EOM_LEN - 1;
201 if src.len() <= holdback {
202 return Ok(None);
203 }
204 let safe_len = src.len() - holdback;
205 let chunk = src.split_to(safe_len);
206 self.message_bytes += chunk.len();
207 self.check_size(self.message_bytes)?;
208 trace!(
209 "eom: yielding chunk ({} bytes), holding back {}",
210 chunk.len(),
211 holdback
212 );
213 Ok(Some(DecodedFrame::Chunk(chunk.freeze())))
214 }
215 }
216
217 fn decode_chunked(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
224 if src.len() < CHUNKED_EOM_MARKER_LEN {
226 trace!(
227 "chunked: buffer too small ({} bytes), need more data",
228 src.len()
229 );
230 return Ok(None);
231 }
232
233 if src[0..2] != *CHUNKED_HEADER_START {
235 return Err(FramingError::InvalidHeader {
236 expected: "\\n#",
237 got: src[..2].to_vec(),
238 });
239 }
240
241 if src[2] == b'#' {
243 if src[3] != b'\n' {
244 return Err(FramingError::InvalidHeader {
245 expected: "\\n##\\n",
246 got: src[..4].to_vec(),
247 });
248 }
249 src.advance(CHUNKED_EOM_MARKER_LEN);
250 self.message_bytes = 0;
251 debug!("chunked: end of message");
252 return Ok(Some(DecodedFrame::EndOfMessage));
253 }
254
255 let header_start = 2; let header_end = match src[header_start..].iter().position(|&b| b == b'\n') {
258 Some(pos_end_of_header) => header_start + pos_end_of_header,
259 None => {
260 if src.len() > 20 {
263 return Err(FramingError::InvalidChunkSize(
264 String::from_utf8_lossy(&src[header_start..]).into_owned(),
265 ));
266 }
267 return Ok(None);
268 }
269 };
270
271 let size_str = &src[header_start..header_end];
273 let chunk_size: usize = std::str::from_utf8(size_str)
274 .map_err(|_| {
275 FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
276 })?
277 .parse()
278 .map_err(|_| {
279 FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
280 })?;
281
282 if chunk_size == 0 {
283 return Err(FramingError::InvalidChunkSize("0".into()));
284 }
285
286 let header_len = header_end + 1; let total_chunk_len = header_len + chunk_size;
291
292 if self.config.lenient_chunked_framing.is_enabled() {
293 return self.decode_chunked_lenient(src, header_len, chunk_size);
294 }
295
296 if src.len() < total_chunk_len {
297 trace!(
298 "chunked: need {} more bytes for chunk (have {}, need {})",
299 total_chunk_len - src.len(),
300 src.len(),
301 total_chunk_len
302 );
303 return Ok(None);
304 }
305 self.message_bytes += chunk_size;
306 self.check_size(self.message_bytes)?;
307
308 trace!("chunked: yielding chunk ({} bytes)", chunk_size);
309
310 src.advance(header_len);
312
313 let chunk_data = src.split_to(chunk_size);
315 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
316 }
317
318 fn decode_chunked_lenient(
321 &mut self,
322 src: &mut BytesMut,
323 header_len: usize,
324 chunk_size: usize,
325 ) -> Result<Option<DecodedFrame>, FramingError> {
326 let data_start = header_len;
327 let declared_end = data_start + chunk_size;
328 let mode = self.config.lenient_chunked_framing;
329
330 if src.len() >= declared_end + 2
333 && src[declared_end] == b'\n'
334 && src[declared_end + 1] == b'#'
335 {
336 if src.len() < data_start + chunk_size {
338 return Ok(None);
339 }
340 self.message_bytes += chunk_size;
341 self.check_size(self.message_bytes)?;
342 trace!(
343 "chunked-lenient: fast path, declared size correct ({} bytes)",
344 chunk_size
345 );
346 src.advance(header_len);
347 let chunk_data = src.split_to(chunk_size);
348 return Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())));
349 }
350
351 match Self::find_real_chunk_boundary(src, data_start) {
353 Some(real_size) => {
354 if real_size == chunk_size {
355 self.message_bytes += chunk_size;
359 self.check_size(self.message_bytes)?;
360 trace!(
361 "chunked-lenient: scan confirmed declared size ({} bytes)",
362 chunk_size
363 );
364 src.advance(header_len);
365 let chunk_data = src.split_to(chunk_size);
366 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
367 } else if real_size > chunk_size && mode.recovers_undercount() {
368 warn!(
369 "chunked-lenient: undercount recovery, declared={} actual={}",
370 chunk_size, real_size
371 );
372 self.message_bytes += real_size;
373 self.check_size(self.message_bytes)?;
374 src.advance(header_len);
375 let chunk_data = src.split_to(real_size);
376 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
377 } else if real_size < chunk_size && mode.recovers_overcount() {
378 warn!(
379 "chunked-lenient: overcount recovery, declared={} actual={}",
380 chunk_size, real_size
381 );
382 self.message_bytes += real_size;
383 self.check_size(self.message_bytes)?;
384 src.advance(header_len);
385 let chunk_data = src.split_to(real_size);
386 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
387 } else {
388 let total_chunk_len = header_len + chunk_size;
391 if src.len() < total_chunk_len {
392 return Ok(None);
393 }
394 self.message_bytes += chunk_size;
395 self.check_size(self.message_bytes)?;
396 src.advance(header_len);
397 let chunk_data = src.split_to(chunk_size);
398 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
399 }
400 }
401 None => {
402 Ok(None)
404 }
405 }
406 }
407
408 fn find_real_chunk_boundary(src: &BytesMut, data_start: usize) -> Option<usize> {
418 let data = &src[data_start..];
419 let finder = memchr::memmem::Finder::new(b"\n#");
420 let mut search_start = 0;
421
422 loop {
423 let candidate = match finder.find(&data[search_start..]) {
424 Some(pos) => search_start + pos,
425 None => return None,
426 };
427
428 let after_hash = candidate + 2;
430 if after_hash >= data.len() {
431 return None;
433 }
434
435 let next_byte = data[after_hash];
436
437 if next_byte == b'#' {
438 if after_hash + 1 >= data.len() {
440 return None; }
442 if data[after_hash + 1] == b'\n' {
443 return Some(candidate);
444 }
445 search_start = after_hash;
447 continue;
448 }
449
450 if next_byte.is_ascii_digit() {
451 let mut i = after_hash + 1;
453 loop {
454 if i >= data.len() {
455 return None; }
457 if data[i] == b'\n' {
458 return Some(candidate);
460 }
461 if !data[i].is_ascii_digit() {
462 break;
464 }
465 i += 1;
466 }
467 search_start = after_hash;
468 continue;
469 }
470
471 search_start = after_hash;
473 }
474 }
475}
476
477pub(crate) async fn read_eom_message<R: AsyncRead + Unpin>(
478 reader: &mut R,
479 max_size: Option<usize>,
480) -> crate::Result<String> {
481 let mut buf = Vec::with_capacity(4096);
482 let mut tmp = [0u8; 4096];
483
484 loop {
485 let read_bytes = reader.read(&mut tmp).await?;
486
487 if read_bytes == 0 {
488 debug!("read_eom: unexpected EOF after {} bytes", buf.len());
489 return Err(FramingError::UnexpectedEof.into());
490 }
491 buf.extend_from_slice(&tmp[..read_bytes]);
492 trace!(
493 "read_eom: read {} bytes, buffer now {} bytes",
494 read_bytes,
495 buf.len()
496 );
497 if let Some(limit) = max_size
498 && buf.len() > limit + EOM_LEN
499 {
500 return Err(FramingError::MessageTooLarge {
501 limit,
502 received: buf.len(),
503 }
504 .into());
505 }
506 if let Some(pos) = memchr::memmem::find(&buf, EOM_MARKER) {
507 buf.truncate(pos);
508 debug!("read_eom: complete message ({} bytes)", buf.len());
509 return String::from_utf8(buf).map_err(|_| FramingError::InvalidUtf8.into());
510 }
511 }
512}
513
514pub(crate) async fn write_eom_message<W: AsyncWrite + Unpin>(
515 writer: &mut W,
516 message: &str,
517) -> crate::Result<()> {
518 writer.write_all(message.as_bytes()).await?;
519 writer.write_all(EOM_MARKER).await?;
520 writer.flush().await?;
521 Ok(())
522}
523
524pub(crate) fn extract_message_id_from_bytes(bytes: &[u8]) -> Option<u32> {
531 const PATTERN: &[u8] = b"message-id=\"";
532 let pos = memchr::memmem::find(bytes, PATTERN)?;
533 let start = pos + PATTERN.len();
534 let remaining = bytes.get(start..)?;
535 let end = memchr::memchr(b'"', remaining)?;
536 let id_bytes = &remaining[..end];
537 std::str::from_utf8(id_bytes).ok()?.parse().ok()
538}
539
540impl Decoder for NetconfCodec {
542 type Item = DecodedFrame;
543 type Error = FramingError;
544
545 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
546 match self.framing_mode {
547 FramingMode::EndOfMessage => self.decode_eom(src),
548 FramingMode::Chunked => self.decode_chunked(src),
549 }
550 }
551
552 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
559 match self.decode(buf)? {
560 Some(frame) => Ok(Some(frame)),
561 None => {
562 if !buf.is_empty() {
563 if self.closing {
564 debug!(
565 "decode_eof: discarding {} leftover bytes (session closing)",
566 buf.len()
567 );
568 buf.clear();
569 } else {
570 return Err(FramingError::Io(std::io::Error::other(format!(
571 "bytes remaining on stream ({} bytes)",
572 buf.len()
573 ))));
574 }
575 }
576 Ok(None)
577 }
578 }
579 }
580}
581
582impl Encoder<Bytes> for NetconfCodec {
583 type Error = FramingError;
584
585 fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
586 debug!(
587 "encode: framing={:?}, message={} bytes",
588 self.framing_mode,
589 item.len()
590 );
591 trace!(
592 "encode: message preview: {:?}",
593 String::from_utf8_lossy(&item[..item.len().min(200)])
594 );
595 match self.framing_mode {
596 FramingMode::EndOfMessage => {
597 dst.reserve(item.len() + EOM_LEN);
598 dst.extend_from_slice(&item);
599 dst.extend_from_slice(EOM_MARKER);
600 }
601 FramingMode::Chunked => {
602 let header = format!("\n#{}\n", item.len());
603 dst.reserve(header.len() + item.len() + CHUNKED_EOM_MARKER_LEN);
604 dst.extend_from_slice(header.as_bytes());
605 dst.extend_from_slice(&item);
606 dst.extend_from_slice(CHUNKED_EOM_MARKER);
607 }
608 }
609 Ok(())
610 }
611}
612
613#[cfg(test)]
614mod tests {
615 use super::*;
616
617 fn collect_message(codec: &mut NetconfCodec, buf: &mut BytesMut) -> Bytes {
619 let mut result = BytesMut::new();
620 loop {
621 match codec.decode(buf).unwrap() {
622 Some(DecodedFrame::Chunk(chunk)) => result.extend_from_slice(&chunk),
623 Some(DecodedFrame::EndOfMessage) => break,
624 None => panic!("unexpected None before EndOfMessage"),
625 }
626 }
627 result.freeze()
628 }
629
630 #[test]
633 fn eom_decode_complete_message() {
634 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
635 let mut buf = BytesMut::from(&b"<rpc-reply/>]]>]]>"[..]);
636
637 let result = codec.decode(&mut buf).unwrap();
639 assert_eq!(
640 result,
641 Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-reply/>")))
642 );
643
644 let result = codec.decode(&mut buf).unwrap();
646 assert_eq!(result, Some(DecodedFrame::EndOfMessage));
647
648 assert!(buf.is_empty());
649 }
650
651 #[test]
652 fn eom_decode_incomplete_message() {
653 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
654 let mut buf = BytesMut::from(&b"<rpc-reply/>"[..]);
655
656 let result = codec.decode(&mut buf).unwrap();
658 assert_eq!(
659 result,
660 Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-re")))
661 );
662
663 let result = codec.decode(&mut buf).unwrap();
665 assert_eq!(result, None);
666 }
667
668 #[test]
669 fn eom_decode_partial_marker() {
670 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
671 let mut buf = BytesMut::from(&b"<ok/>]]>"[..]);
672
673 assert_eq!(
675 codec.decode(&mut buf).unwrap(),
676 Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok")))
677 );
678 assert_eq!(codec.decode(&mut buf).unwrap(), None);
680
681 buf.extend_from_slice(b"]]>");
683 assert_eq!(
685 codec.decode(&mut buf).unwrap(),
686 Some(DecodedFrame::Chunk(Bytes::from_static(b"/>")))
687 );
688 assert_eq!(
689 codec.decode(&mut buf).unwrap(),
690 Some(DecodedFrame::EndOfMessage)
691 );
692 }
693
694 #[test]
695 fn eom_decode_empty_message() {
696 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
697 let mut buf = BytesMut::from(&b"]]>]]>"[..]);
698
699 assert_eq!(
701 codec.decode(&mut buf).unwrap(),
702 Some(DecodedFrame::EndOfMessage)
703 );
704 }
705
706 #[test]
707 fn eom_decode_two_messages() {
708 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
709 let mut buf = BytesMut::from(&b"<a/>]]>]]><b/>]]>]]>"[..]);
710
711 assert_eq!(
712 codec.decode(&mut buf).unwrap(),
713 Some(DecodedFrame::Chunk(Bytes::from_static(b"<a/>")))
714 );
715 assert_eq!(
716 codec.decode(&mut buf).unwrap(),
717 Some(DecodedFrame::EndOfMessage)
718 );
719 assert_eq!(
720 codec.decode(&mut buf).unwrap(),
721 Some(DecodedFrame::Chunk(Bytes::from_static(b"<b/>")))
722 );
723 assert_eq!(
724 codec.decode(&mut buf).unwrap(),
725 Some(DecodedFrame::EndOfMessage)
726 );
727 }
728
729 #[test]
730 fn eom_decode_within_holdback() {
731 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
733 let mut buf = BytesMut::from(&b"<ok>"[..]);
734 assert_eq!(codec.decode(&mut buf).unwrap(), None);
735 }
736
737 #[test]
738 fn eom_decode_large_message_streams_chunks() {
739 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
740 let data = "x".repeat(1000);
741 let mut buf = BytesMut::new();
742 buf.extend_from_slice(data.as_bytes());
743 buf.extend_from_slice(EOM_MARKER);
744
745 let collected = collect_message(&mut codec, &mut buf);
747 assert_eq!(collected, Bytes::from(data));
748 }
749
750 #[test]
753 fn chunked_decode_single_chunk() {
754 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
755 let mut buf = BytesMut::from(&b"\n#7\n<data/>\n##\n"[..]);
756
757 assert_eq!(
758 codec.decode(&mut buf).unwrap(),
759 Some(DecodedFrame::Chunk(Bytes::from_static(b"<data/>")))
760 );
761 assert_eq!(
762 codec.decode(&mut buf).unwrap(),
763 Some(DecodedFrame::EndOfMessage)
764 );
765 assert!(buf.is_empty());
766 }
767
768 #[test]
769 fn chunked_decode_multiple_chunks_yields_individually() {
770 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
771 let mut buf = BytesMut::from(&b"\n#5\nHello\n#6\n World\n##\n"[..]);
772
773 assert_eq!(
775 codec.decode(&mut buf).unwrap(),
776 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
777 );
778 assert_eq!(
779 codec.decode(&mut buf).unwrap(),
780 Some(DecodedFrame::Chunk(Bytes::from_static(b" World")))
781 );
782 assert_eq!(
783 codec.decode(&mut buf).unwrap(),
784 Some(DecodedFrame::EndOfMessage)
785 );
786 }
787
788 #[test]
789 fn chunked_decode_incomplete_header() {
790 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
791 let mut buf = BytesMut::from(&b"\n#"[..]);
792 assert_eq!(codec.decode(&mut buf).unwrap(), None);
793 }
794
795 #[test]
796 fn chunked_decode_incomplete_data() {
797 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
798 let mut buf = BytesMut::from(&b"\n#10\nHello"[..]);
799 assert_eq!(codec.decode(&mut buf).unwrap(), None);
800
801 buf.extend_from_slice(b" Wrld\n##\n");
803 assert_eq!(
804 codec.decode(&mut buf).unwrap(),
805 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello Wrld")))
806 );
807 assert_eq!(
808 codec.decode(&mut buf).unwrap(),
809 Some(DecodedFrame::EndOfMessage)
810 );
811 }
812
813 #[test]
814 fn chunked_decode_large_chunk() {
815 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
816 let data = "x".repeat(10000);
817 let mut buf = BytesMut::new();
818 buf.extend_from_slice(format!("\n#{}\n", data.len()).as_bytes());
819 buf.extend_from_slice(data.as_bytes());
820 buf.extend_from_slice(b"\n##\n");
821
822 let result = codec.decode(&mut buf).unwrap();
823 match result {
824 Some(DecodedFrame::Chunk(chunk)) => assert_eq!(chunk.len(), 10000),
825 other => panic!("expected Chunk, got {:?}", other),
826 }
827 assert_eq!(
828 codec.decode(&mut buf).unwrap(),
829 Some(DecodedFrame::EndOfMessage)
830 );
831 }
832
833 #[test]
834 fn chunked_decode_invalid_header() {
835 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
836 let mut buf = BytesMut::from(&b"\n#abc\n"[..]);
837 let err = codec.decode(&mut buf).unwrap_err();
838 assert!(matches!(err, FramingError::InvalidChunkSize(_)));
839 }
840
841 #[test]
842 fn chunked_decode_zero_chunk_size() {
843 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
844 let mut buf = BytesMut::from(&b"\n#0\n\n##\n"[..]);
845 let err = codec.decode(&mut buf).unwrap_err();
846 assert!(matches!(err, FramingError::InvalidChunkSize(_)));
847 }
848
849 #[test]
850 fn chunked_decode_size_limit() {
851 let config = CodecConfig {
852 max_message_size: Some(5),
853 ..Default::default()
854 };
855 let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
856 let mut buf = BytesMut::from(&b"\n#10\n0123456789\n##\n"[..]);
857 let err = codec.decode(&mut buf).unwrap_err();
858 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
859 }
860
861 #[test]
862 fn chunked_decode_size_limit_cumulative() {
863 let config = CodecConfig {
864 max_message_size: Some(10),
865 ..Default::default()
866 };
867 let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
868 let mut buf = BytesMut::from(&b"\n#5\naaaaa\n#5\nbbbbb\n#5\nccccc\n##\n"[..]);
870
871 assert!(codec.decode(&mut buf).unwrap().is_some());
873 assert!(codec.decode(&mut buf).unwrap().is_some());
875 let err = codec.decode(&mut buf).unwrap_err();
877 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
878 }
879
880 #[test]
881 fn eom_decode_size_limit() {
882 let config = CodecConfig {
883 max_message_size: Some(10),
884 ..Default::default()
885 };
886 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
887 let mut buf = BytesMut::new();
888 buf.extend_from_slice(b"01234567890123456789]]>]]>");
890 let err = codec.decode(&mut buf).unwrap_err();
891 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
892 }
893
894 #[test]
895 fn eom_decode_size_limit_cumulative() {
896 let config = CodecConfig {
897 max_message_size: Some(10),
898 ..Default::default()
899 };
900 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
901
902 let mut buf = BytesMut::from(&b"01234567"[..]);
905 assert!(codec.decode(&mut buf).unwrap().is_some()); buf.extend_from_slice(b"89ABCDEF]]>]]>");
909 let err = codec.decode(&mut buf).unwrap_err();
910 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
911 }
912
913 #[test]
914 fn eom_decode_size_limit_resets_between_messages() {
915 let config = CodecConfig {
916 max_message_size: Some(10),
917 ..Default::default()
918 };
919 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
920
921 let mut buf = BytesMut::from(&b"hello]]>]]>"[..]);
923 let collected = collect_message(&mut codec, &mut buf);
924 assert_eq!(collected, Bytes::from_static(b"hello"));
925
926 buf.extend_from_slice(b"world]]>]]>");
928 let collected = collect_message(&mut codec, &mut buf);
929 assert_eq!(collected, Bytes::from_static(b"world"));
930 }
931
932 #[test]
935 fn eom_encode() {
936 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
937 let mut buf = BytesMut::new();
938 codec
939 .encode(Bytes::from_static(b"<ok/>"), &mut buf)
940 .unwrap();
941 assert_eq!(&buf[..], b"<ok/>]]>]]>");
942 }
943
944 #[test]
945 fn chunked_encode() {
946 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
947 let mut buf = BytesMut::new();
948 codec
949 .encode(Bytes::from_static(b"<ok/>"), &mut buf)
950 .unwrap();
951 assert_eq!(&buf[..], b"\n#5\n<ok/>\n##\n");
952 }
953
954 #[test]
957 fn eom_roundtrip() {
958 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
959 let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
960 let mut buf = BytesMut::new();
961 codec.encode(original.clone(), &mut buf).unwrap();
962
963 let collected = collect_message(&mut codec, &mut buf);
964 assert_eq!(collected, original);
965 }
966
967 #[test]
968 fn chunked_roundtrip() {
969 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
970 let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
971 let mut buf = BytesMut::new();
972 codec.encode(original.clone(), &mut buf).unwrap();
973
974 let collected = collect_message(&mut codec, &mut buf);
975 assert_eq!(collected, original);
976 }
977
978 #[test]
979 fn mode_switch() {
980 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
981
982 let mut buf = BytesMut::new();
984 codec
985 .encode(Bytes::from_static(b"hello"), &mut buf)
986 .unwrap();
987 let collected = collect_message(&mut codec, &mut buf);
988 assert_eq!(collected, Bytes::from_static(b"hello"));
989
990 codec.set_mode(FramingMode::Chunked);
992
993 let mut buf = BytesMut::new();
994 codec
995 .encode(Bytes::from_static(b"world"), &mut buf)
996 .unwrap();
997 let collected = collect_message(&mut codec, &mut buf);
998 assert_eq!(collected, Bytes::from_static(b"world"));
999 }
1000
1001 #[tokio::test]
1004 async fn eom_helper_roundtrip() {
1005 let (mut client, mut server) = tokio::io::duplex(4096);
1006
1007 let msg = "<hello/>";
1008 tokio::spawn(async move {
1009 write_eom_message(&mut server, msg).await.unwrap();
1010 });
1011
1012 let received = read_eom_message(&mut client, None).await.unwrap();
1013 assert_eq!(received, msg);
1014 }
1015
1016 #[tokio::test]
1017 async fn eom_helper_size_limit() {
1018 let (mut client, mut server) = tokio::io::duplex(4096);
1019
1020 let msg = "x".repeat(1000);
1021 tokio::spawn(async move {
1022 write_eom_message(&mut server, &msg).await.unwrap();
1023 });
1024
1025 let result = read_eom_message(&mut client, Some(10)).await;
1026 assert!(result.is_err());
1027 }
1028
1029 #[test]
1032 fn extract_message_id_basic() {
1033 let xml =
1034 b"<rpc-reply message-id=\"42\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
1035 assert_eq!(extract_message_id_from_bytes(xml), Some(42));
1036 }
1037
1038 #[test]
1039 fn extract_message_id_large_id() {
1040 let xml = b"<rpc-reply message-id=\"4294967295\">";
1041 assert_eq!(extract_message_id_from_bytes(xml), Some(4294967295));
1042 }
1043
1044 #[test]
1045 fn extract_message_id_with_xml_decl() {
1046 let xml = b"<?xml version=\"1.0\"?><rpc-reply message-id=\"7\">";
1047 assert_eq!(extract_message_id_from_bytes(xml), Some(7));
1048 }
1049
1050 #[test]
1051 fn extract_message_id_missing() {
1052 let xml = b"<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
1053 assert_eq!(extract_message_id_from_bytes(xml), None);
1054 }
1055
1056 #[test]
1057 fn extract_message_id_non_numeric() {
1058 let xml = b"<rpc-reply message-id=\"abc\">";
1059 assert_eq!(extract_message_id_from_bytes(xml), None);
1060 }
1061
1062 #[test]
1063 fn extract_message_id_empty_bytes() {
1064 assert_eq!(extract_message_id_from_bytes(b""), None);
1065 }
1066
1067 #[test]
1068 fn extract_message_id_partial_header() {
1069 let xml = b"<rpc-reply message-id=\"12";
1071 assert_eq!(extract_message_id_from_bytes(xml), None);
1072 }
1073
1074 #[test]
1077 fn eom_incremental_arrival() {
1078 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
1080 let mut buf = BytesMut::new();
1081
1082 buf.extend_from_slice(b"<ok");
1084 assert_eq!(codec.decode(&mut buf).unwrap(), None);
1085
1086 buf.extend_from_slice(b"/>");
1088 assert_eq!(codec.decode(&mut buf).unwrap(), None);
1090
1091 buf.extend_from_slice(b"]]>]]>");
1093 assert_eq!(
1095 codec.decode(&mut buf).unwrap(),
1096 Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok/>")))
1097 );
1098 assert_eq!(
1099 codec.decode(&mut buf).unwrap(),
1100 Some(DecodedFrame::EndOfMessage)
1101 );
1102 }
1103
1104 #[test]
1105 fn chunked_two_messages_interleaved() {
1106 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
1107 let mut buf = BytesMut::new();
1108 buf.extend_from_slice(b"\n#3\naaa\n##\n\n#3\nbbb\n##\n");
1109
1110 assert_eq!(
1112 codec.decode(&mut buf).unwrap(),
1113 Some(DecodedFrame::Chunk(Bytes::from_static(b"aaa")))
1114 );
1115 assert_eq!(
1116 codec.decode(&mut buf).unwrap(),
1117 Some(DecodedFrame::EndOfMessage)
1118 );
1119
1120 assert_eq!(
1122 codec.decode(&mut buf).unwrap(),
1123 Some(DecodedFrame::Chunk(Bytes::from_static(b"bbb")))
1124 );
1125 assert_eq!(
1126 codec.decode(&mut buf).unwrap(),
1127 Some(DecodedFrame::EndOfMessage)
1128 );
1129 }
1130
1131 #[test]
1132 fn chunked_three_chunks_in_one_message() {
1133 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
1134 let mut buf = BytesMut::from(&b"\n#1\na\n#1\nb\n#1\nc\n##\n"[..]);
1135
1136 assert_eq!(
1137 codec.decode(&mut buf).unwrap(),
1138 Some(DecodedFrame::Chunk(Bytes::from_static(b"a")))
1139 );
1140 assert_eq!(
1141 codec.decode(&mut buf).unwrap(),
1142 Some(DecodedFrame::Chunk(Bytes::from_static(b"b")))
1143 );
1144 assert_eq!(
1145 codec.decode(&mut buf).unwrap(),
1146 Some(DecodedFrame::Chunk(Bytes::from_static(b"c")))
1147 );
1148 assert_eq!(
1149 codec.decode(&mut buf).unwrap(),
1150 Some(DecodedFrame::EndOfMessage)
1151 );
1152 }
1153
1154 fn lenient_config(mode: LenientChunkedFraming) -> CodecConfig {
1157 CodecConfig {
1158 max_message_size: None,
1159 lenient_chunked_framing: mode,
1160 }
1161 }
1162
1163 #[test]
1164 fn chunked_lenient_correct_size_fast_path() {
1165 let mut codec = NetconfCodec::new(
1167 FramingMode::Chunked,
1168 lenient_config(LenientChunkedFraming::RecoverBoth),
1169 );
1170 let mut buf = BytesMut::from(&b"\n#5\nHello\n#6\n World\n##\n"[..]);
1171
1172 assert_eq!(
1173 codec.decode(&mut buf).unwrap(),
1174 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
1175 );
1176 assert_eq!(
1177 codec.decode(&mut buf).unwrap(),
1178 Some(DecodedFrame::Chunk(Bytes::from_static(b" World")))
1179 );
1180 assert_eq!(
1181 codec.decode(&mut buf).unwrap(),
1182 Some(DecodedFrame::EndOfMessage)
1183 );
1184 }
1185
1186 #[test]
1187 fn chunked_lenient_undercount_recovery() {
1188 let mut codec = NetconfCodec::new(
1190 FramingMode::Chunked,
1191 lenient_config(LenientChunkedFraming::RecoverUndercount),
1192 );
1193 let mut buf = BytesMut::from(&b"\n#5\nHello World\n##\n"[..]);
1195
1196 let frame = codec.decode(&mut buf).unwrap();
1197 assert_eq!(
1198 frame,
1199 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello World")))
1200 );
1201 assert_eq!(
1202 codec.decode(&mut buf).unwrap(),
1203 Some(DecodedFrame::EndOfMessage)
1204 );
1205 }
1206
1207 #[test]
1208 fn chunked_lenient_overcount_recovery() {
1209 let mut codec = NetconfCodec::new(
1211 FramingMode::Chunked,
1212 lenient_config(LenientChunkedFraming::RecoverOvercount),
1213 );
1214 let mut buf = BytesMut::from(&b"\n#20\nHello\n#3\nEnd\n##\n"[..]);
1216
1217 let frame = codec.decode(&mut buf).unwrap();
1218 assert_eq!(
1219 frame,
1220 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
1221 );
1222 assert_eq!(
1223 codec.decode(&mut buf).unwrap(),
1224 Some(DecodedFrame::Chunk(Bytes::from_static(b"End")))
1225 );
1226 assert_eq!(
1227 codec.decode(&mut buf).unwrap(),
1228 Some(DecodedFrame::EndOfMessage)
1229 );
1230 }
1231
1232 #[test]
1233 fn chunked_lenient_both_recovery() {
1234 let mut codec = NetconfCodec::new(
1236 FramingMode::Chunked,
1237 lenient_config(LenientChunkedFraming::RecoverBoth),
1238 );
1239 let mut buf = BytesMut::from(&b"\n#2\nHello\n##\n"[..]);
1240 assert_eq!(
1241 codec.decode(&mut buf).unwrap(),
1242 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
1243 );
1244 assert_eq!(
1245 codec.decode(&mut buf).unwrap(),
1246 Some(DecodedFrame::EndOfMessage)
1247 );
1248
1249 codec.message_bytes = 0;
1251 let mut buf = BytesMut::from(&b"\n#20\nWorld\n##\n"[..]);
1252 assert_eq!(
1253 codec.decode(&mut buf).unwrap(),
1254 Some(DecodedFrame::Chunk(Bytes::from_static(b"World")))
1255 );
1256 assert_eq!(
1257 codec.decode(&mut buf).unwrap(),
1258 Some(DecodedFrame::EndOfMessage)
1259 );
1260 }
1261
1262 #[test]
1263 fn chunked_lenient_undercount_only_ignores_overcount() {
1264 let mut codec = NetconfCodec::new(
1267 FramingMode::Chunked,
1268 lenient_config(LenientChunkedFraming::RecoverUndercount),
1269 );
1270 let mut buf = BytesMut::from(&b"\n#20\nHello\n##\n"[..]);
1277 assert_eq!(codec.decode(&mut buf).unwrap(), None);
1278 }
1279
1280 #[test]
1281 fn chunked_lenient_xml_with_hash_false_positive() {
1282 let mut codec = NetconfCodec::new(
1285 FramingMode::Chunked,
1286 lenient_config(LenientChunkedFraming::RecoverBoth),
1287 );
1288 let data = b"data\n#comment\nend";
1291 let mut buf = BytesMut::new();
1292 buf.extend_from_slice(format!("\n#5\n").as_bytes());
1293 buf.extend_from_slice(data);
1294 buf.extend_from_slice(b"\n##\n");
1295
1296 let frame = codec.decode(&mut buf).unwrap();
1297 assert_eq!(
1298 frame,
1299 Some(DecodedFrame::Chunk(Bytes::from(data.as_slice())))
1300 );
1301 assert_eq!(
1302 codec.decode(&mut buf).unwrap(),
1303 Some(DecodedFrame::EndOfMessage)
1304 );
1305 }
1306
1307 #[test]
1308 fn chunked_lenient_incremental_arrival() {
1309 let mut codec = NetconfCodec::new(
1311 FramingMode::Chunked,
1312 lenient_config(LenientChunkedFraming::RecoverBoth),
1313 );
1314
1315 let mut buf = BytesMut::from(&b"\n#5\nHel"[..]);
1317 assert_eq!(codec.decode(&mut buf).unwrap(), None);
1318
1319 buf.extend_from_slice(b"lo Wor");
1321 assert_eq!(codec.decode(&mut buf).unwrap(), None);
1322
1323 buf.extend_from_slice(b"ld\n##\n");
1325 assert_eq!(
1326 codec.decode(&mut buf).unwrap(),
1327 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello World")))
1328 );
1329 assert_eq!(
1330 codec.decode(&mut buf).unwrap(),
1331 Some(DecodedFrame::EndOfMessage)
1332 );
1333 }
1334
1335 #[test]
1336 fn chunked_lenient_size_limit_uses_actual_size() {
1337 let config = CodecConfig {
1339 max_message_size: Some(5),
1340 lenient_chunked_framing: LenientChunkedFraming::RecoverUndercount,
1341 };
1342 let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
1343 let mut buf = BytesMut::from(&b"\n#3\n0123456789\n##\n"[..]);
1345 let err = codec.decode(&mut buf).unwrap_err();
1346 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
1347 }
1348}