1use bytes::{Buf, Bytes, BytesMut};
2use log::{debug, trace};
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio_util::codec::{Decoder, Encoder};
5
6use crate::error::FramingError;
7
8const EOM_MARKER: &[u8] = b"]]>]]>";
9const EOM_LEN: usize = EOM_MARKER.len();
10
11const CHUNKED_EOM_MARKER: &[u8] = b"\n##\n";
12const CHUNKED_EOM_MARKER_LEN: usize = CHUNKED_EOM_MARKER.len();
13
14const CHUNKED_HEADER_START: &[u8] = b"\n#";
15
16#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum DecodedFrame {
24 Chunk(Bytes),
26 EndOfMessage,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum FramingMode {
38 EndOfMessage,
40 Chunked,
42}
43
44#[derive(Default, Debug, Clone, Copy)]
45pub struct CodecConfig {
46 pub max_message_size: Option<usize>, }
48
49pub struct NetconfCodec {
50 framing_mode: FramingMode,
51 config: CodecConfig,
52 eom_complete: bool,
58 message_bytes: usize,
62 closing: bool,
67}
68
69impl NetconfCodec {
83 pub fn new(framing_mode: FramingMode, config: CodecConfig) -> Self {
84 Self {
85 framing_mode,
86 config,
87 eom_complete: false,
88 message_bytes: 0,
89 closing: false,
90 }
91 }
92
93 pub fn set_mode(&mut self, framing_mode: FramingMode) {
94 self.framing_mode = framing_mode;
95 self.eom_complete = false;
96 self.message_bytes = 0;
97 }
98 pub fn framing_mode(&self) -> FramingMode {
99 self.framing_mode
100 }
101 pub fn set_closing(&mut self) {
104 self.closing = true;
105 }
106
107 fn check_size(&self, size: usize) -> Result<(), FramingError> {
108 if let Some(max_size) = self.config.max_message_size
109 && size > max_size
110 {
111 return Err(FramingError::MessageTooLarge {
112 limit: max_size,
113 received: size,
114 });
115 }
116 Ok(())
117 }
118
119 fn decode_eom(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
127 if self.eom_complete {
130 self.eom_complete = false;
131 self.message_bytes = 0;
132 return Ok(Some(DecodedFrame::EndOfMessage));
133 }
134
135 if src.is_empty() {
136 return Ok(None);
137 }
138
139 if let Some(pos) = memchr::memmem::find(src, EOM_MARKER) {
141 if pos > 0 {
142 let data = src.split_to(pos);
144 src.advance(EOM_LEN);
145 self.message_bytes += data.len();
146 self.check_size(self.message_bytes)?;
147 self.eom_complete = true;
148 debug!("eom: final chunk ({} bytes), marker found", data.len());
149 Ok(Some(DecodedFrame::Chunk(data.freeze())))
150 } else {
151 src.advance(EOM_LEN);
153 self.message_bytes = 0;
154 debug!("eom: empty message");
155 Ok(Some(DecodedFrame::EndOfMessage))
156 }
157 } else {
158 let holdback = EOM_LEN - 1;
161 if src.len() <= holdback {
162 return Ok(None);
163 }
164 let safe_len = src.len() - holdback;
165 let chunk = src.split_to(safe_len);
166 self.message_bytes += chunk.len();
167 self.check_size(self.message_bytes)?;
168 trace!(
169 "eom: yielding chunk ({} bytes), holding back {}",
170 chunk.len(),
171 holdback
172 );
173 Ok(Some(DecodedFrame::Chunk(chunk.freeze())))
174 }
175 }
176
177 fn decode_chunked(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
184 if src.len() < CHUNKED_EOM_MARKER_LEN {
186 trace!(
187 "chunked: buffer too small ({} bytes), need more data",
188 src.len()
189 );
190 return Ok(None);
191 }
192
193 if src[0..2] != *CHUNKED_HEADER_START {
195 return Err(FramingError::InvalidHeader {
196 expected: "\\n#",
197 got: src[..2].to_vec(),
198 });
199 }
200
201 if src[2] == b'#' {
203 if src[3] != b'\n' {
204 return Err(FramingError::InvalidHeader {
205 expected: "\\n##\\n",
206 got: src[..4].to_vec(),
207 });
208 }
209 src.advance(CHUNKED_EOM_MARKER_LEN);
210 self.message_bytes = 0;
211 debug!("chunked: end of message");
212 return Ok(Some(DecodedFrame::EndOfMessage));
213 }
214
215 let header_start = 2; let header_end = match src[header_start..].iter().position(|&b| b == b'\n') {
218 Some(pos_end_of_header) => header_start + pos_end_of_header,
219 None => {
220 if src.len() > 20 {
223 return Err(FramingError::InvalidChunkSize(
224 String::from_utf8_lossy(&src[header_start..]).into_owned(),
225 ));
226 }
227 return Ok(None);
228 }
229 };
230
231 let size_str = &src[header_start..header_end];
233 let chunk_size: usize = std::str::from_utf8(size_str)
234 .map_err(|_| {
235 FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
236 })?
237 .parse()
238 .map_err(|_| {
239 FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
240 })?;
241
242 if chunk_size == 0 {
243 return Err(FramingError::InvalidChunkSize("0".into()));
244 }
245
246 let header_len = header_end + 1; let total_chunk_len = header_len + chunk_size;
251 if src.len() < total_chunk_len {
252 trace!(
253 "chunked: need {} more bytes for chunk (have {}, need {})",
254 total_chunk_len - src.len(),
255 src.len(),
256 total_chunk_len
257 );
258 return Ok(None);
259 }
260 self.message_bytes += chunk_size;
261 self.check_size(self.message_bytes)?;
262
263 trace!("chunked: yielding chunk ({} bytes)", chunk_size);
264
265 src.advance(header_len);
267
268 let chunk_data = src.split_to(chunk_size);
270 Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
271 }
272}
273
274pub(crate) async fn read_eom_message<R: AsyncRead + Unpin>(
275 reader: &mut R,
276 max_size: Option<usize>,
277) -> crate::Result<String> {
278 let mut buf = Vec::with_capacity(4096);
279 let mut tmp = [0u8; 4096];
280
281 loop {
282 let read_bytes = reader.read(&mut tmp).await?;
283
284 if read_bytes == 0 {
285 debug!("read_eom: unexpected EOF after {} bytes", buf.len());
286 return Err(FramingError::UnexpectedEof.into());
287 }
288 buf.extend_from_slice(&tmp[..read_bytes]);
289 trace!(
290 "read_eom: read {} bytes, buffer now {} bytes",
291 read_bytes,
292 buf.len()
293 );
294 if let Some(limit) = max_size
295 && buf.len() > limit + EOM_LEN
296 {
297 return Err(FramingError::MessageTooLarge {
298 limit,
299 received: buf.len(),
300 }
301 .into());
302 }
303 if let Some(pos) = memchr::memmem::find(&buf, EOM_MARKER) {
304 buf.truncate(pos);
305 debug!("read_eom: complete message ({} bytes)", buf.len());
306 return String::from_utf8(buf).map_err(|_| FramingError::InvalidUtf8.into());
307 }
308 }
309}
310
311pub(crate) async fn write_eom_message<W: AsyncWrite + Unpin>(
312 writer: &mut W,
313 message: &str,
314) -> crate::Result<()> {
315 writer.write_all(message.as_bytes()).await?;
316 writer.write_all(EOM_MARKER).await?;
317 writer.flush().await?;
318 Ok(())
319}
320
321pub(crate) fn extract_message_id_from_bytes(bytes: &[u8]) -> Option<u32> {
328 const PATTERN: &[u8] = b"message-id=\"";
329 let pos = memchr::memmem::find(bytes, PATTERN)?;
330 let start = pos + PATTERN.len();
331 let remaining = bytes.get(start..)?;
332 let end = memchr::memchr(b'"', remaining)?;
333 let id_bytes = &remaining[..end];
334 std::str::from_utf8(id_bytes).ok()?.parse().ok()
335}
336
337impl Decoder for NetconfCodec {
339 type Item = DecodedFrame;
340 type Error = FramingError;
341
342 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
343 match self.framing_mode {
344 FramingMode::EndOfMessage => self.decode_eom(src),
345 FramingMode::Chunked => self.decode_chunked(src),
346 }
347 }
348
349 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
356 match self.decode(buf)? {
357 Some(frame) => Ok(Some(frame)),
358 None => {
359 if !buf.is_empty() {
360 if self.closing {
361 debug!(
362 "decode_eof: discarding {} leftover bytes (session closing)",
363 buf.len()
364 );
365 buf.clear();
366 } else {
367 return Err(FramingError::Io(std::io::Error::other(format!(
368 "bytes remaining on stream ({} bytes)",
369 buf.len()
370 ))));
371 }
372 }
373 Ok(None)
374 }
375 }
376 }
377}
378
379impl Encoder<Bytes> for NetconfCodec {
380 type Error = FramingError;
381
382 fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
383 debug!(
384 "encode: framing={:?}, message={} bytes",
385 self.framing_mode,
386 item.len()
387 );
388 trace!(
389 "encode: message preview: {:?}",
390 String::from_utf8_lossy(&item[..item.len().min(200)])
391 );
392 match self.framing_mode {
393 FramingMode::EndOfMessage => {
394 dst.reserve(item.len() + EOM_LEN);
395 dst.extend_from_slice(&item);
396 dst.extend_from_slice(EOM_MARKER);
397 }
398 FramingMode::Chunked => {
399 let header = format!("\n#{}\n", item.len());
400 dst.reserve(header.len() + item.len() + CHUNKED_EOM_MARKER_LEN);
401 dst.extend_from_slice(header.as_bytes());
402 dst.extend_from_slice(&item);
403 dst.extend_from_slice(CHUNKED_EOM_MARKER);
404 }
405 }
406 Ok(())
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 fn collect_message(codec: &mut NetconfCodec, buf: &mut BytesMut) -> Bytes {
416 let mut result = BytesMut::new();
417 loop {
418 match codec.decode(buf).unwrap() {
419 Some(DecodedFrame::Chunk(chunk)) => result.extend_from_slice(&chunk),
420 Some(DecodedFrame::EndOfMessage) => break,
421 None => panic!("unexpected None before EndOfMessage"),
422 }
423 }
424 result.freeze()
425 }
426
427 #[test]
430 fn eom_decode_complete_message() {
431 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
432 let mut buf = BytesMut::from(&b"<rpc-reply/>]]>]]>"[..]);
433
434 let result = codec.decode(&mut buf).unwrap();
436 assert_eq!(
437 result,
438 Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-reply/>")))
439 );
440
441 let result = codec.decode(&mut buf).unwrap();
443 assert_eq!(result, Some(DecodedFrame::EndOfMessage));
444
445 assert!(buf.is_empty());
446 }
447
448 #[test]
449 fn eom_decode_incomplete_message() {
450 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
451 let mut buf = BytesMut::from(&b"<rpc-reply/>"[..]);
452
453 let result = codec.decode(&mut buf).unwrap();
455 assert_eq!(
456 result,
457 Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-re")))
458 );
459
460 let result = codec.decode(&mut buf).unwrap();
462 assert_eq!(result, None);
463 }
464
465 #[test]
466 fn eom_decode_partial_marker() {
467 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
468 let mut buf = BytesMut::from(&b"<ok/>]]>"[..]);
469
470 assert_eq!(
472 codec.decode(&mut buf).unwrap(),
473 Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok")))
474 );
475 assert_eq!(codec.decode(&mut buf).unwrap(), None);
477
478 buf.extend_from_slice(b"]]>");
480 assert_eq!(
482 codec.decode(&mut buf).unwrap(),
483 Some(DecodedFrame::Chunk(Bytes::from_static(b"/>")))
484 );
485 assert_eq!(
486 codec.decode(&mut buf).unwrap(),
487 Some(DecodedFrame::EndOfMessage)
488 );
489 }
490
491 #[test]
492 fn eom_decode_empty_message() {
493 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
494 let mut buf = BytesMut::from(&b"]]>]]>"[..]);
495
496 assert_eq!(
498 codec.decode(&mut buf).unwrap(),
499 Some(DecodedFrame::EndOfMessage)
500 );
501 }
502
503 #[test]
504 fn eom_decode_two_messages() {
505 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
506 let mut buf = BytesMut::from(&b"<a/>]]>]]><b/>]]>]]>"[..]);
507
508 assert_eq!(
509 codec.decode(&mut buf).unwrap(),
510 Some(DecodedFrame::Chunk(Bytes::from_static(b"<a/>")))
511 );
512 assert_eq!(
513 codec.decode(&mut buf).unwrap(),
514 Some(DecodedFrame::EndOfMessage)
515 );
516 assert_eq!(
517 codec.decode(&mut buf).unwrap(),
518 Some(DecodedFrame::Chunk(Bytes::from_static(b"<b/>")))
519 );
520 assert_eq!(
521 codec.decode(&mut buf).unwrap(),
522 Some(DecodedFrame::EndOfMessage)
523 );
524 }
525
526 #[test]
527 fn eom_decode_within_holdback() {
528 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
530 let mut buf = BytesMut::from(&b"<ok>"[..]);
531 assert_eq!(codec.decode(&mut buf).unwrap(), None);
532 }
533
534 #[test]
535 fn eom_decode_large_message_streams_chunks() {
536 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
537 let data = "x".repeat(1000);
538 let mut buf = BytesMut::new();
539 buf.extend_from_slice(data.as_bytes());
540 buf.extend_from_slice(EOM_MARKER);
541
542 let collected = collect_message(&mut codec, &mut buf);
544 assert_eq!(collected, Bytes::from(data));
545 }
546
547 #[test]
550 fn chunked_decode_single_chunk() {
551 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
552 let mut buf = BytesMut::from(&b"\n#7\n<data/>\n##\n"[..]);
553
554 assert_eq!(
555 codec.decode(&mut buf).unwrap(),
556 Some(DecodedFrame::Chunk(Bytes::from_static(b"<data/>")))
557 );
558 assert_eq!(
559 codec.decode(&mut buf).unwrap(),
560 Some(DecodedFrame::EndOfMessage)
561 );
562 assert!(buf.is_empty());
563 }
564
565 #[test]
566 fn chunked_decode_multiple_chunks_yields_individually() {
567 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
568 let mut buf = BytesMut::from(&b"\n#5\nHello\n#6\n World\n##\n"[..]);
569
570 assert_eq!(
572 codec.decode(&mut buf).unwrap(),
573 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
574 );
575 assert_eq!(
576 codec.decode(&mut buf).unwrap(),
577 Some(DecodedFrame::Chunk(Bytes::from_static(b" World")))
578 );
579 assert_eq!(
580 codec.decode(&mut buf).unwrap(),
581 Some(DecodedFrame::EndOfMessage)
582 );
583 }
584
585 #[test]
586 fn chunked_decode_incomplete_header() {
587 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
588 let mut buf = BytesMut::from(&b"\n#"[..]);
589 assert_eq!(codec.decode(&mut buf).unwrap(), None);
590 }
591
592 #[test]
593 fn chunked_decode_incomplete_data() {
594 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
595 let mut buf = BytesMut::from(&b"\n#10\nHello"[..]);
596 assert_eq!(codec.decode(&mut buf).unwrap(), None);
597
598 buf.extend_from_slice(b" Wrld\n##\n");
600 assert_eq!(
601 codec.decode(&mut buf).unwrap(),
602 Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello Wrld")))
603 );
604 assert_eq!(
605 codec.decode(&mut buf).unwrap(),
606 Some(DecodedFrame::EndOfMessage)
607 );
608 }
609
610 #[test]
611 fn chunked_decode_large_chunk() {
612 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
613 let data = "x".repeat(10000);
614 let mut buf = BytesMut::new();
615 buf.extend_from_slice(format!("\n#{}\n", data.len()).as_bytes());
616 buf.extend_from_slice(data.as_bytes());
617 buf.extend_from_slice(b"\n##\n");
618
619 let result = codec.decode(&mut buf).unwrap();
620 match result {
621 Some(DecodedFrame::Chunk(chunk)) => assert_eq!(chunk.len(), 10000),
622 other => panic!("expected Chunk, got {:?}", other),
623 }
624 assert_eq!(
625 codec.decode(&mut buf).unwrap(),
626 Some(DecodedFrame::EndOfMessage)
627 );
628 }
629
630 #[test]
631 fn chunked_decode_invalid_header() {
632 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
633 let mut buf = BytesMut::from(&b"\n#abc\n"[..]);
634 let err = codec.decode(&mut buf).unwrap_err();
635 assert!(matches!(err, FramingError::InvalidChunkSize(_)));
636 }
637
638 #[test]
639 fn chunked_decode_zero_chunk_size() {
640 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
641 let mut buf = BytesMut::from(&b"\n#0\n\n##\n"[..]);
642 let err = codec.decode(&mut buf).unwrap_err();
643 assert!(matches!(err, FramingError::InvalidChunkSize(_)));
644 }
645
646 #[test]
647 fn chunked_decode_size_limit() {
648 let config = CodecConfig {
649 max_message_size: Some(5),
650 };
651 let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
652 let mut buf = BytesMut::from(&b"\n#10\n0123456789\n##\n"[..]);
653 let err = codec.decode(&mut buf).unwrap_err();
654 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
655 }
656
657 #[test]
658 fn chunked_decode_size_limit_cumulative() {
659 let config = CodecConfig {
660 max_message_size: Some(10),
661 };
662 let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
663 let mut buf = BytesMut::from(&b"\n#5\naaaaa\n#5\nbbbbb\n#5\nccccc\n##\n"[..]);
665
666 assert!(codec.decode(&mut buf).unwrap().is_some());
668 assert!(codec.decode(&mut buf).unwrap().is_some());
670 let err = codec.decode(&mut buf).unwrap_err();
672 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
673 }
674
675 #[test]
676 fn eom_decode_size_limit() {
677 let config = CodecConfig {
678 max_message_size: Some(10),
679 };
680 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
681 let mut buf = BytesMut::new();
682 buf.extend_from_slice(b"01234567890123456789]]>]]>");
684 let err = codec.decode(&mut buf).unwrap_err();
685 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
686 }
687
688 #[test]
689 fn eom_decode_size_limit_cumulative() {
690 let config = CodecConfig {
691 max_message_size: Some(10),
692 };
693 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
694
695 let mut buf = BytesMut::from(&b"01234567"[..]);
698 assert!(codec.decode(&mut buf).unwrap().is_some()); buf.extend_from_slice(b"89ABCDEF]]>]]>");
702 let err = codec.decode(&mut buf).unwrap_err();
703 assert!(matches!(err, FramingError::MessageTooLarge { .. }));
704 }
705
706 #[test]
707 fn eom_decode_size_limit_resets_between_messages() {
708 let config = CodecConfig {
709 max_message_size: Some(10),
710 };
711 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
712
713 let mut buf = BytesMut::from(&b"hello]]>]]>"[..]);
715 let collected = collect_message(&mut codec, &mut buf);
716 assert_eq!(collected, Bytes::from_static(b"hello"));
717
718 buf.extend_from_slice(b"world]]>]]>");
720 let collected = collect_message(&mut codec, &mut buf);
721 assert_eq!(collected, Bytes::from_static(b"world"));
722 }
723
724 #[test]
727 fn eom_encode() {
728 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
729 let mut buf = BytesMut::new();
730 codec
731 .encode(Bytes::from_static(b"<ok/>"), &mut buf)
732 .unwrap();
733 assert_eq!(&buf[..], b"<ok/>]]>]]>");
734 }
735
736 #[test]
737 fn chunked_encode() {
738 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
739 let mut buf = BytesMut::new();
740 codec
741 .encode(Bytes::from_static(b"<ok/>"), &mut buf)
742 .unwrap();
743 assert_eq!(&buf[..], b"\n#5\n<ok/>\n##\n");
744 }
745
746 #[test]
749 fn eom_roundtrip() {
750 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
751 let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
752 let mut buf = BytesMut::new();
753 codec.encode(original.clone(), &mut buf).unwrap();
754
755 let collected = collect_message(&mut codec, &mut buf);
756 assert_eq!(collected, original);
757 }
758
759 #[test]
760 fn chunked_roundtrip() {
761 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
762 let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
763 let mut buf = BytesMut::new();
764 codec.encode(original.clone(), &mut buf).unwrap();
765
766 let collected = collect_message(&mut codec, &mut buf);
767 assert_eq!(collected, original);
768 }
769
770 #[test]
771 fn mode_switch() {
772 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
773
774 let mut buf = BytesMut::new();
776 codec
777 .encode(Bytes::from_static(b"hello"), &mut buf)
778 .unwrap();
779 let collected = collect_message(&mut codec, &mut buf);
780 assert_eq!(collected, Bytes::from_static(b"hello"));
781
782 codec.set_mode(FramingMode::Chunked);
784
785 let mut buf = BytesMut::new();
786 codec
787 .encode(Bytes::from_static(b"world"), &mut buf)
788 .unwrap();
789 let collected = collect_message(&mut codec, &mut buf);
790 assert_eq!(collected, Bytes::from_static(b"world"));
791 }
792
793 #[tokio::test]
796 async fn eom_helper_roundtrip() {
797 let (mut client, mut server) = tokio::io::duplex(4096);
798
799 let msg = "<hello/>";
800 tokio::spawn(async move {
801 write_eom_message(&mut server, msg).await.unwrap();
802 });
803
804 let received = read_eom_message(&mut client, None).await.unwrap();
805 assert_eq!(received, msg);
806 }
807
808 #[tokio::test]
809 async fn eom_helper_size_limit() {
810 let (mut client, mut server) = tokio::io::duplex(4096);
811
812 let msg = "x".repeat(1000);
813 tokio::spawn(async move {
814 write_eom_message(&mut server, &msg).await.unwrap();
815 });
816
817 let result = read_eom_message(&mut client, Some(10)).await;
818 assert!(result.is_err());
819 }
820
821 #[test]
824 fn extract_message_id_basic() {
825 let xml =
826 b"<rpc-reply message-id=\"42\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
827 assert_eq!(extract_message_id_from_bytes(xml), Some(42));
828 }
829
830 #[test]
831 fn extract_message_id_large_id() {
832 let xml = b"<rpc-reply message-id=\"4294967295\">";
833 assert_eq!(extract_message_id_from_bytes(xml), Some(4294967295));
834 }
835
836 #[test]
837 fn extract_message_id_with_xml_decl() {
838 let xml = b"<?xml version=\"1.0\"?><rpc-reply message-id=\"7\">";
839 assert_eq!(extract_message_id_from_bytes(xml), Some(7));
840 }
841
842 #[test]
843 fn extract_message_id_missing() {
844 let xml = b"<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
845 assert_eq!(extract_message_id_from_bytes(xml), None);
846 }
847
848 #[test]
849 fn extract_message_id_non_numeric() {
850 let xml = b"<rpc-reply message-id=\"abc\">";
851 assert_eq!(extract_message_id_from_bytes(xml), None);
852 }
853
854 #[test]
855 fn extract_message_id_empty_bytes() {
856 assert_eq!(extract_message_id_from_bytes(b""), None);
857 }
858
859 #[test]
860 fn extract_message_id_partial_header() {
861 let xml = b"<rpc-reply message-id=\"12";
863 assert_eq!(extract_message_id_from_bytes(xml), None);
864 }
865
866 #[test]
869 fn eom_incremental_arrival() {
870 let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
872 let mut buf = BytesMut::new();
873
874 buf.extend_from_slice(b"<ok");
876 assert_eq!(codec.decode(&mut buf).unwrap(), None);
877
878 buf.extend_from_slice(b"/>");
880 assert_eq!(codec.decode(&mut buf).unwrap(), None);
882
883 buf.extend_from_slice(b"]]>]]>");
885 assert_eq!(
887 codec.decode(&mut buf).unwrap(),
888 Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok/>")))
889 );
890 assert_eq!(
891 codec.decode(&mut buf).unwrap(),
892 Some(DecodedFrame::EndOfMessage)
893 );
894 }
895
896 #[test]
897 fn chunked_two_messages_interleaved() {
898 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
899 let mut buf = BytesMut::new();
900 buf.extend_from_slice(b"\n#3\naaa\n##\n\n#3\nbbb\n##\n");
901
902 assert_eq!(
904 codec.decode(&mut buf).unwrap(),
905 Some(DecodedFrame::Chunk(Bytes::from_static(b"aaa")))
906 );
907 assert_eq!(
908 codec.decode(&mut buf).unwrap(),
909 Some(DecodedFrame::EndOfMessage)
910 );
911
912 assert_eq!(
914 codec.decode(&mut buf).unwrap(),
915 Some(DecodedFrame::Chunk(Bytes::from_static(b"bbb")))
916 );
917 assert_eq!(
918 codec.decode(&mut buf).unwrap(),
919 Some(DecodedFrame::EndOfMessage)
920 );
921 }
922
923 #[test]
924 fn chunked_three_chunks_in_one_message() {
925 let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
926 let mut buf = BytesMut::from(&b"\n#1\na\n#1\nb\n#1\nc\n##\n"[..]);
927
928 assert_eq!(
929 codec.decode(&mut buf).unwrap(),
930 Some(DecodedFrame::Chunk(Bytes::from_static(b"a")))
931 );
932 assert_eq!(
933 codec.decode(&mut buf).unwrap(),
934 Some(DecodedFrame::Chunk(Bytes::from_static(b"b")))
935 );
936 assert_eq!(
937 codec.decode(&mut buf).unwrap(),
938 Some(DecodedFrame::Chunk(Bytes::from_static(b"c")))
939 );
940 assert_eq!(
941 codec.decode(&mut buf).unwrap(),
942 Some(DecodedFrame::EndOfMessage)
943 );
944 }
945}