Skip to main content

netconf_rust/
codec.rs

1use bytes::{Buf, Bytes, BytesMut};
2use log::{debug, trace};
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio_util::codec::{Decoder, Encoder};
5
6use crate::error::FramingError;
7
8const EOM_MARKER: &[u8] = b"]]>]]>";
9const EOM_LEN: usize = EOM_MARKER.len();
10
11const CHUNKED_EOM_MARKER: &[u8] = b"\n##\n";
12const CHUNKED_EOM_MARKER_LEN: usize = CHUNKED_EOM_MARKER.len();
13
14const CHUNKED_HEADER_START: &[u8] = b"\n#";
15
16/// A decoded frame from the NETCONF codec.
17///
18/// Instead of returning complete messages, the decoder yields individual
19/// chunks as they arrive. This enables streaming: the reader task can
20/// forward chunks directly to consumers without buffering the entire
21/// message.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum DecodedFrame {
24    /// A piece of the current NETCONF message.
25    Chunk(Bytes),
26    /// The current message is complete.
27    EndOfMessage,
28}
29
30/// Netconf defines 2 framing modes.
31/// 1. Netconf 1.0 defines End of Message (EOM) in RFC 4742, where each message is followed by
32///    the literal sequence `]]>]]>`. For example, `<rpc-reply message-id="1"><ok/></rpc-reply>]]>]]>`
33/// 2. Netconf 1.1 defines Chunked in RFC 6242, where each message is sent with length-prefixed
34///    chunks. The chunk starts with \n#{num_of_bytes_in_msg}\n. \n##\n defines end of message
35///    for example `\n#28\n<rpc-reply><ok/></rpc-re\n#6\nply/>\n##\n`
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum FramingMode {
38    /// NETCONF 1.0 (RFC 4742): messages terminated by `]]>]]`
39    EndOfMessage,
40    /// NETCONF 1.1 (RFC 6242): length-prefixed chunked framing
41    Chunked,
42}
43
44#[derive(Default, Debug, Clone, Copy)]
45pub struct CodecConfig {
46    pub max_message_size: Option<usize>, // None = unlimited
47}
48
49pub struct NetconfCodec {
50    framing_mode: FramingMode,
51    config: CodecConfig,
52    /// Tracks whether we need to yield `EndOfMessage` on the next `decode()`
53    /// call. In EOM mode, when the `]]>]]>` marker is found, the final data
54    /// is yielded as a `Chunk` first, and `EndOfMessage` follows on the next
55    /// call. This two-step yield is needed because `decode()` can only return
56    /// one item at a time.
57    eom_complete: bool,
58    /// Cumulative bytes yielded for the current message. Reset on
59    /// `EndOfMessage`. Used to enforce `max_message_size` in both EOM
60    /// and chunked modes.
61    message_bytes: usize,
62    /// When true, `decode_eof` silently discards leftover bytes instead of
63    /// returning an error. Set by the reader loop after a `close-session` RPC
64    /// is in flight so that harmless holdback bytes don't surface as a
65    /// transport error.
66    closing: bool,
67}
68
69/// tokio_util codec for NETCONF message framing.
70///
71/// Implements the `Decoder` trait from tokio_util. Instead of returning
72/// complete messages, the decoder yields [`DecodedFrame`] items:
73/// individual `Chunk`s as data arrives, followed by `EndOfMessage` when
74/// the message boundary is reached.
75///
76/// This chunk-level output enables the reader task to route messages
77/// without buffering: normal RPCs accumulate chunks into a `BytesMut`,
78/// while streaming RPCs forward each chunk directly to the consumer.
79///
80/// The `Encoder` side is unchanged — it frames complete XML documents
81/// with the appropriate delimiter/chunking.
82impl NetconfCodec {
83    pub fn new(framing_mode: FramingMode, config: CodecConfig) -> Self {
84        Self {
85            framing_mode,
86            config,
87            eom_complete: false,
88            message_bytes: 0,
89            closing: false,
90        }
91    }
92
93    pub fn set_mode(&mut self, framing_mode: FramingMode) {
94        self.framing_mode = framing_mode;
95        self.eom_complete = false;
96        self.message_bytes = 0;
97    }
98    pub fn framing_mode(&self) -> FramingMode {
99        self.framing_mode
100    }
101    /// Mark the codec as closing so that `decode_eof` silently discards
102    /// leftover holdback bytes instead of treating them as an error.
103    pub fn set_closing(&mut self) {
104        self.closing = true;
105    }
106
107    fn check_size(&self, size: usize) -> Result<(), FramingError> {
108        if let Some(max_size) = self.config.max_message_size
109            && size > max_size
110        {
111            return Err(FramingError::MessageTooLarge {
112                limit: max_size,
113                received: size,
114            });
115        }
116        Ok(())
117    }
118
119    /// Decode EOM-framed data into chunks.
120    ///
121    /// Yields available data as `Chunk` frames, holding back `EOM_LEN - 1`
122    /// bytes to handle `]]>]]>` markers split across reads. When the marker
123    /// is found, the final data before it is yielded as a `Chunk`, and
124    /// `EndOfMessage` follows on the next `decode()` call via the
125    /// `eom_complete` flag.
126    fn decode_eom(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
127        // Two-step yield: if we just found the EOM marker and yielded
128        // the final Chunk, now yield EndOfMessage.
129        if self.eom_complete {
130            self.eom_complete = false;
131            self.message_bytes = 0;
132            return Ok(Some(DecodedFrame::EndOfMessage));
133        }
134
135        if src.is_empty() {
136            return Ok(None);
137        }
138
139        // Search for the EOM marker.
140        if let Some(pos) = memchr::memmem::find(src, EOM_MARKER) {
141            if pos > 0 {
142                // Yield data before the marker as a Chunk, schedule EndOfMessage.
143                let data = src.split_to(pos);
144                src.advance(EOM_LEN);
145                self.message_bytes += data.len();
146                self.check_size(self.message_bytes)?;
147                self.eom_complete = true;
148                debug!("eom: final chunk ({} bytes), marker found", data.len());
149                Ok(Some(DecodedFrame::Chunk(data.freeze())))
150            } else {
151                // Marker at the start — empty message.
152                src.advance(EOM_LEN);
153                self.message_bytes = 0;
154                debug!("eom: empty message");
155                Ok(Some(DecodedFrame::EndOfMessage))
156            }
157        } else {
158            // No marker found. Yield everything except the holdback
159            // (EOM_LEN - 1 bytes) since the marker could be split across reads.
160            let holdback = EOM_LEN - 1;
161            if src.len() <= holdback {
162                return Ok(None);
163            }
164            let safe_len = src.len() - holdback;
165            let chunk = src.split_to(safe_len);
166            self.message_bytes += chunk.len();
167            self.check_size(self.message_bytes)?;
168            trace!(
169                "eom: yielding chunk ({} bytes), holding back {}",
170                chunk.len(),
171                holdback
172            );
173            Ok(Some(DecodedFrame::Chunk(chunk.freeze())))
174        }
175    }
176
177    /// Decode chunked-framed data into individual chunks.
178    ///
179    /// Each RFC 6242 chunk `\n#<size>\n<data>` yields a `Chunk(data)`.
180    /// The end marker `\n##\n` yields `EndOfMessage`. Unlike the previous
181    /// implementation, chunks are NOT accumulated — each one is yielded
182    /// individually, enabling streaming.
183    fn decode_chunked(&mut self, src: &mut BytesMut) -> Result<Option<DecodedFrame>, FramingError> {
184        // The smallest possible frame is \n##\n (4 bytes).
185        if src.len() < CHUNKED_EOM_MARKER_LEN {
186            trace!(
187                "chunked: buffer too small ({} bytes), need more data",
188                src.len()
189            );
190            return Ok(None);
191        }
192
193        // Every chunk or end marker starts with \n#
194        if src[0..2] != *CHUNKED_HEADER_START {
195            return Err(FramingError::InvalidHeader {
196                expected: "\\n#",
197                got: src[..2].to_vec(),
198            });
199        }
200
201        // Check for end of chunks marker: \n##\n.
202        if src[2] == b'#' {
203            if src[3] != b'\n' {
204                return Err(FramingError::InvalidHeader {
205                    expected: "\\n##\\n",
206                    got: src[..4].to_vec(),
207                });
208            }
209            src.advance(CHUNKED_EOM_MARKER_LEN);
210            self.message_bytes = 0;
211            debug!("chunked: end of message");
212            return Ok(Some(DecodedFrame::EndOfMessage));
213        }
214
215        // Parse chunk header \n#<size>\n
216        let header_start = 2; // skip \n#
217        let header_end = match src[header_start..].iter().position(|&b| b == b'\n') {
218            Some(pos_end_of_header) => header_start + pos_end_of_header,
219            None => {
220                // Header not yet complete - need more data.
221                // Sanity-check the header length.
222                if src.len() > 20 {
223                    return Err(FramingError::InvalidChunkSize(
224                        String::from_utf8_lossy(&src[header_start..]).into_owned(),
225                    ));
226                }
227                return Ok(None);
228            }
229        };
230
231        // Extract the chunk size from the header and parse into usize
232        let size_str = &src[header_start..header_end];
233        let chunk_size: usize = std::str::from_utf8(size_str)
234            .map_err(|_| {
235                FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
236            })?
237            .parse()
238            .map_err(|_| {
239                FramingError::InvalidChunkSize(String::from_utf8_lossy(size_str).into_owned())
240            })?;
241
242        if chunk_size == 0 {
243            return Err(FramingError::InvalidChunkSize("0".into()));
244        }
245
246        // Total header length: \n# + size digits + \n
247        let header_len = header_end + 1; // +1 for the trailing \n
248
249        // Check if the full chunk (header + data) is available
250        let total_chunk_len = header_len + chunk_size;
251        if src.len() < total_chunk_len {
252            trace!(
253                "chunked: need {} more bytes for chunk (have {}, need {})",
254                total_chunk_len - src.len(),
255                src.len(),
256                total_chunk_len
257            );
258            return Ok(None);
259        }
260        self.message_bytes += chunk_size;
261        self.check_size(self.message_bytes)?;
262
263        trace!("chunked: yielding chunk ({} bytes)", chunk_size);
264
265        // Consume header
266        src.advance(header_len);
267
268        // Extract and yield the chunk data
269        let chunk_data = src.split_to(chunk_size);
270        Ok(Some(DecodedFrame::Chunk(chunk_data.freeze())))
271    }
272}
273
274pub(crate) async fn read_eom_message<R: AsyncRead + Unpin>(
275    reader: &mut R,
276    max_size: Option<usize>,
277) -> crate::Result<String> {
278    let mut buf = Vec::with_capacity(4096);
279    let mut tmp = [0u8; 4096];
280
281    loop {
282        let read_bytes = reader.read(&mut tmp).await?;
283
284        if read_bytes == 0 {
285            debug!("read_eom: unexpected EOF after {} bytes", buf.len());
286            return Err(FramingError::UnexpectedEof.into());
287        }
288        buf.extend_from_slice(&tmp[..read_bytes]);
289        trace!(
290            "read_eom: read {} bytes, buffer now {} bytes",
291            read_bytes,
292            buf.len()
293        );
294        if let Some(limit) = max_size
295            && buf.len() > limit + EOM_LEN
296        {
297            return Err(FramingError::MessageTooLarge {
298                limit,
299                received: buf.len(),
300            }
301            .into());
302        }
303        if let Some(pos) = memchr::memmem::find(&buf, EOM_MARKER) {
304            buf.truncate(pos);
305            debug!("read_eom: complete message ({} bytes)", buf.len());
306            return String::from_utf8(buf).map_err(|_| FramingError::InvalidUtf8.into());
307        }
308    }
309}
310
311pub(crate) async fn write_eom_message<W: AsyncWrite + Unpin>(
312    writer: &mut W,
313    message: &str,
314) -> crate::Result<()> {
315    writer.write_all(message.as_bytes()).await?;
316    writer.write_all(EOM_MARKER).await?;
317    writer.flush().await?;
318    Ok(())
319}
320
321/// Extract the `message-id` attribute value from a NETCONF XML fragment.
322///
323/// Performs a byte-level scan for `message-id="<digits>"` and returns the
324/// parsed `u32`. This avoids pulling in an XML parser for the hot path
325/// in the reader task, where we only need the message-id from the first
326/// chunk to route the message.
327pub(crate) fn extract_message_id_from_bytes(bytes: &[u8]) -> Option<u32> {
328    const PATTERN: &[u8] = b"message-id=\"";
329    let pos = memchr::memmem::find(bytes, PATTERN)?;
330    let start = pos + PATTERN.len();
331    let remaining = bytes.get(start..)?;
332    let end = memchr::memchr(b'"', remaining)?;
333    let id_bytes = &remaining[..end];
334    std::str::from_utf8(id_bytes).ok()?.parse().ok()
335}
336
337/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html>
338impl Decoder for NetconfCodec {
339    type Item = DecodedFrame;
340    type Error = FramingError;
341
342    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
343        match self.framing_mode {
344            FramingMode::EndOfMessage => self.decode_eom(src),
345            FramingMode::Chunked => self.decode_chunked(src),
346        }
347    }
348
349    /// Handle EOF from the underlying transport.
350    ///
351    /// When `closing` is set (a `close-session` RPC is in flight), leftover
352    /// holdback bytes are harmless artifacts of the EOM split-marker guard
353    /// and are silently discarded. Otherwise we fall back to the default
354    /// behaviour: any remaining bytes at EOF indicate a transport problem.
355    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
356        match self.decode(buf)? {
357            Some(frame) => Ok(Some(frame)),
358            None => {
359                if !buf.is_empty() {
360                    if self.closing {
361                        debug!(
362                            "decode_eof: discarding {} leftover bytes (session closing)",
363                            buf.len()
364                        );
365                        buf.clear();
366                    } else {
367                        return Err(FramingError::Io(std::io::Error::other(format!(
368                            "bytes remaining on stream ({} bytes)",
369                            buf.len()
370                        ))));
371                    }
372                }
373                Ok(None)
374            }
375        }
376    }
377}
378
379impl Encoder<Bytes> for NetconfCodec {
380    type Error = FramingError;
381
382    fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
383        debug!(
384            "encode: framing={:?}, message={} bytes",
385            self.framing_mode,
386            item.len()
387        );
388        trace!(
389            "encode: message preview: {:?}",
390            String::from_utf8_lossy(&item[..item.len().min(200)])
391        );
392        match self.framing_mode {
393            FramingMode::EndOfMessage => {
394                dst.reserve(item.len() + EOM_LEN);
395                dst.extend_from_slice(&item);
396                dst.extend_from_slice(EOM_MARKER);
397            }
398            FramingMode::Chunked => {
399                let header = format!("\n#{}\n", item.len());
400                dst.reserve(header.len() + item.len() + CHUNKED_EOM_MARKER_LEN);
401                dst.extend_from_slice(header.as_bytes());
402                dst.extend_from_slice(&item);
403                dst.extend_from_slice(CHUNKED_EOM_MARKER);
404            }
405        }
406        Ok(())
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    /// Helper: decode all frames until EndOfMessage and concatenate the chunks.
415    fn collect_message(codec: &mut NetconfCodec, buf: &mut BytesMut) -> Bytes {
416        let mut result = BytesMut::new();
417        loop {
418            match codec.decode(buf).unwrap() {
419                Some(DecodedFrame::Chunk(chunk)) => result.extend_from_slice(&chunk),
420                Some(DecodedFrame::EndOfMessage) => break,
421                None => panic!("unexpected None before EndOfMessage"),
422            }
423        }
424        result.freeze()
425    }
426
427    // ── EOM Decoder tests ───────────────────────────────────────────────
428
429    #[test]
430    fn eom_decode_complete_message() {
431        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
432        let mut buf = BytesMut::from(&b"<rpc-reply/>]]>]]>"[..]);
433
434        // First decode: yields the data before the marker as a Chunk
435        let result = codec.decode(&mut buf).unwrap();
436        assert_eq!(
437            result,
438            Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-reply/>")))
439        );
440
441        // Second decode: yields EndOfMessage
442        let result = codec.decode(&mut buf).unwrap();
443        assert_eq!(result, Some(DecodedFrame::EndOfMessage));
444
445        assert!(buf.is_empty());
446    }
447
448    #[test]
449    fn eom_decode_incomplete_message() {
450        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
451        let mut buf = BytesMut::from(&b"<rpc-reply/>"[..]);
452
453        // 12 bytes available, holdback is 5. Safe to emit: 7 bytes.
454        let result = codec.decode(&mut buf).unwrap();
455        assert_eq!(
456            result,
457            Some(DecodedFrame::Chunk(Bytes::from_static(b"<rpc-re")))
458        );
459
460        // Remaining 5 bytes <= holdback, need more data.
461        let result = codec.decode(&mut buf).unwrap();
462        assert_eq!(result, None);
463    }
464
465    #[test]
466    fn eom_decode_partial_marker() {
467        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
468        let mut buf = BytesMut::from(&b"<ok/>]]>"[..]);
469
470        // 8 bytes, holdback 5, safe 3 — yield first chunk
471        assert_eq!(
472            codec.decode(&mut buf).unwrap(),
473            Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok")))
474        );
475        // Remaining 5 bytes <= holdback
476        assert_eq!(codec.decode(&mut buf).unwrap(), None);
477
478        // Complete the marker
479        buf.extend_from_slice(b"]]>");
480        // Now buf is b"/>]]>]]>" — marker found at pos 2
481        assert_eq!(
482            codec.decode(&mut buf).unwrap(),
483            Some(DecodedFrame::Chunk(Bytes::from_static(b"/>")))
484        );
485        assert_eq!(
486            codec.decode(&mut buf).unwrap(),
487            Some(DecodedFrame::EndOfMessage)
488        );
489    }
490
491    #[test]
492    fn eom_decode_empty_message() {
493        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
494        let mut buf = BytesMut::from(&b"]]>]]>"[..]);
495
496        // Marker at position 0 — empty message, yield EndOfMessage directly.
497        assert_eq!(
498            codec.decode(&mut buf).unwrap(),
499            Some(DecodedFrame::EndOfMessage)
500        );
501    }
502
503    #[test]
504    fn eom_decode_two_messages() {
505        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
506        let mut buf = BytesMut::from(&b"<a/>]]>]]><b/>]]>]]>"[..]);
507
508        assert_eq!(
509            codec.decode(&mut buf).unwrap(),
510            Some(DecodedFrame::Chunk(Bytes::from_static(b"<a/>")))
511        );
512        assert_eq!(
513            codec.decode(&mut buf).unwrap(),
514            Some(DecodedFrame::EndOfMessage)
515        );
516        assert_eq!(
517            codec.decode(&mut buf).unwrap(),
518            Some(DecodedFrame::Chunk(Bytes::from_static(b"<b/>")))
519        );
520        assert_eq!(
521            codec.decode(&mut buf).unwrap(),
522            Some(DecodedFrame::EndOfMessage)
523        );
524    }
525
526    #[test]
527    fn eom_decode_within_holdback() {
528        // Buffer smaller than holdback — need more data.
529        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
530        let mut buf = BytesMut::from(&b"<ok>"[..]);
531        assert_eq!(codec.decode(&mut buf).unwrap(), None);
532    }
533
534    #[test]
535    fn eom_decode_large_message_streams_chunks() {
536        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
537        let data = "x".repeat(1000);
538        let mut buf = BytesMut::new();
539        buf.extend_from_slice(data.as_bytes());
540        buf.extend_from_slice(EOM_MARKER);
541
542        // Collect all chunks — should reconstruct the original data.
543        let collected = collect_message(&mut codec, &mut buf);
544        assert_eq!(collected, Bytes::from(data));
545    }
546
547    // ── Chunked Decoder tests ───────────────────────────────────────────
548
549    #[test]
550    fn chunked_decode_single_chunk() {
551        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
552        let mut buf = BytesMut::from(&b"\n#7\n<data/>\n##\n"[..]);
553
554        assert_eq!(
555            codec.decode(&mut buf).unwrap(),
556            Some(DecodedFrame::Chunk(Bytes::from_static(b"<data/>")))
557        );
558        assert_eq!(
559            codec.decode(&mut buf).unwrap(),
560            Some(DecodedFrame::EndOfMessage)
561        );
562        assert!(buf.is_empty());
563    }
564
565    #[test]
566    fn chunked_decode_multiple_chunks_yields_individually() {
567        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
568        let mut buf = BytesMut::from(&b"\n#5\nHello\n#6\n World\n##\n"[..]);
569
570        // Each chunk is yielded individually (not accumulated).
571        assert_eq!(
572            codec.decode(&mut buf).unwrap(),
573            Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello")))
574        );
575        assert_eq!(
576            codec.decode(&mut buf).unwrap(),
577            Some(DecodedFrame::Chunk(Bytes::from_static(b" World")))
578        );
579        assert_eq!(
580            codec.decode(&mut buf).unwrap(),
581            Some(DecodedFrame::EndOfMessage)
582        );
583    }
584
585    #[test]
586    fn chunked_decode_incomplete_header() {
587        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
588        let mut buf = BytesMut::from(&b"\n#"[..]);
589        assert_eq!(codec.decode(&mut buf).unwrap(), None);
590    }
591
592    #[test]
593    fn chunked_decode_incomplete_data() {
594        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
595        let mut buf = BytesMut::from(&b"\n#10\nHello"[..]);
596        assert_eq!(codec.decode(&mut buf).unwrap(), None);
597
598        // Complete the data + end marker
599        buf.extend_from_slice(b" Wrld\n##\n");
600        assert_eq!(
601            codec.decode(&mut buf).unwrap(),
602            Some(DecodedFrame::Chunk(Bytes::from_static(b"Hello Wrld")))
603        );
604        assert_eq!(
605            codec.decode(&mut buf).unwrap(),
606            Some(DecodedFrame::EndOfMessage)
607        );
608    }
609
610    #[test]
611    fn chunked_decode_large_chunk() {
612        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
613        let data = "x".repeat(10000);
614        let mut buf = BytesMut::new();
615        buf.extend_from_slice(format!("\n#{}\n", data.len()).as_bytes());
616        buf.extend_from_slice(data.as_bytes());
617        buf.extend_from_slice(b"\n##\n");
618
619        let result = codec.decode(&mut buf).unwrap();
620        match result {
621            Some(DecodedFrame::Chunk(chunk)) => assert_eq!(chunk.len(), 10000),
622            other => panic!("expected Chunk, got {:?}", other),
623        }
624        assert_eq!(
625            codec.decode(&mut buf).unwrap(),
626            Some(DecodedFrame::EndOfMessage)
627        );
628    }
629
630    #[test]
631    fn chunked_decode_invalid_header() {
632        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
633        let mut buf = BytesMut::from(&b"\n#abc\n"[..]);
634        let err = codec.decode(&mut buf).unwrap_err();
635        assert!(matches!(err, FramingError::InvalidChunkSize(_)));
636    }
637
638    #[test]
639    fn chunked_decode_zero_chunk_size() {
640        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
641        let mut buf = BytesMut::from(&b"\n#0\n\n##\n"[..]);
642        let err = codec.decode(&mut buf).unwrap_err();
643        assert!(matches!(err, FramingError::InvalidChunkSize(_)));
644    }
645
646    #[test]
647    fn chunked_decode_size_limit() {
648        let config = CodecConfig {
649            max_message_size: Some(5),
650        };
651        let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
652        let mut buf = BytesMut::from(&b"\n#10\n0123456789\n##\n"[..]);
653        let err = codec.decode(&mut buf).unwrap_err();
654        assert!(matches!(err, FramingError::MessageTooLarge { .. }));
655    }
656
657    #[test]
658    fn chunked_decode_size_limit_cumulative() {
659        let config = CodecConfig {
660            max_message_size: Some(10),
661        };
662        let mut codec = NetconfCodec::new(FramingMode::Chunked, config);
663        // Three 5-byte chunks: cumulative 15 bytes exceeds the 10-byte limit
664        let mut buf = BytesMut::from(&b"\n#5\naaaaa\n#5\nbbbbb\n#5\nccccc\n##\n"[..]);
665
666        // First chunk (5 bytes) — under limit
667        assert!(codec.decode(&mut buf).unwrap().is_some());
668        // Second chunk (cumulative 10 bytes) — at limit
669        assert!(codec.decode(&mut buf).unwrap().is_some());
670        // Third chunk (cumulative 15 bytes) — exceeds limit
671        let err = codec.decode(&mut buf).unwrap_err();
672        assert!(matches!(err, FramingError::MessageTooLarge { .. }));
673    }
674
675    #[test]
676    fn eom_decode_size_limit() {
677        let config = CodecConfig {
678            max_message_size: Some(10),
679        };
680        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
681        let mut buf = BytesMut::new();
682        // 20 bytes of data exceeds the 10-byte limit
683        buf.extend_from_slice(b"01234567890123456789]]>]]>");
684        let err = codec.decode(&mut buf).unwrap_err();
685        assert!(matches!(err, FramingError::MessageTooLarge { .. }));
686    }
687
688    #[test]
689    fn eom_decode_size_limit_cumulative() {
690        let config = CodecConfig {
691            max_message_size: Some(10),
692        };
693        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
694
695        // First batch: 8 bytes of data, no marker yet. Holdback keeps 5,
696        // so 3 bytes are yielded — under the limit.
697        let mut buf = BytesMut::from(&b"01234567"[..]);
698        assert!(codec.decode(&mut buf).unwrap().is_some()); // yields 3 bytes
699
700        // Second batch: more data + marker. Cumulative now exceeds 10.
701        buf.extend_from_slice(b"89ABCDEF]]>]]>");
702        let err = codec.decode(&mut buf).unwrap_err();
703        assert!(matches!(err, FramingError::MessageTooLarge { .. }));
704    }
705
706    #[test]
707    fn eom_decode_size_limit_resets_between_messages() {
708        let config = CodecConfig {
709            max_message_size: Some(10),
710        };
711        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, config);
712
713        // First message: 5 bytes — under the limit
714        let mut buf = BytesMut::from(&b"hello]]>]]>"[..]);
715        let collected = collect_message(&mut codec, &mut buf);
716        assert_eq!(collected, Bytes::from_static(b"hello"));
717
718        // Second message: also 5 bytes — should succeed (counter was reset)
719        buf.extend_from_slice(b"world]]>]]>");
720        let collected = collect_message(&mut codec, &mut buf);
721        assert_eq!(collected, Bytes::from_static(b"world"));
722    }
723
724    // ── Encoder tests ───────────────────────────────────────────────────
725
726    #[test]
727    fn eom_encode() {
728        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
729        let mut buf = BytesMut::new();
730        codec
731            .encode(Bytes::from_static(b"<ok/>"), &mut buf)
732            .unwrap();
733        assert_eq!(&buf[..], b"<ok/>]]>]]>");
734    }
735
736    #[test]
737    fn chunked_encode() {
738        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
739        let mut buf = BytesMut::new();
740        codec
741            .encode(Bytes::from_static(b"<ok/>"), &mut buf)
742            .unwrap();
743        assert_eq!(&buf[..], b"\n#5\n<ok/>\n##\n");
744    }
745
746    // ── Roundtrip tests ─────────────────────────────────────────────────
747
748    #[test]
749    fn eom_roundtrip() {
750        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
751        let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
752        let mut buf = BytesMut::new();
753        codec.encode(original.clone(), &mut buf).unwrap();
754
755        let collected = collect_message(&mut codec, &mut buf);
756        assert_eq!(collected, original);
757    }
758
759    #[test]
760    fn chunked_roundtrip() {
761        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
762        let original = Bytes::from_static(b"<rpc message-id=\"1\"><get/></rpc>");
763        let mut buf = BytesMut::new();
764        codec.encode(original.clone(), &mut buf).unwrap();
765
766        let collected = collect_message(&mut codec, &mut buf);
767        assert_eq!(collected, original);
768    }
769
770    #[test]
771    fn mode_switch() {
772        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
773
774        // Encode/decode in EOM mode
775        let mut buf = BytesMut::new();
776        codec
777            .encode(Bytes::from_static(b"hello"), &mut buf)
778            .unwrap();
779        let collected = collect_message(&mut codec, &mut buf);
780        assert_eq!(collected, Bytes::from_static(b"hello"));
781
782        // Switch to chunked
783        codec.set_mode(FramingMode::Chunked);
784
785        let mut buf = BytesMut::new();
786        codec
787            .encode(Bytes::from_static(b"world"), &mut buf)
788            .unwrap();
789        let collected = collect_message(&mut codec, &mut buf);
790        assert_eq!(collected, Bytes::from_static(b"world"));
791    }
792
793    // ── EOM hello helpers tests ─────────────────────────────────────────
794
795    #[tokio::test]
796    async fn eom_helper_roundtrip() {
797        let (mut client, mut server) = tokio::io::duplex(4096);
798
799        let msg = "<hello/>";
800        tokio::spawn(async move {
801            write_eom_message(&mut server, msg).await.unwrap();
802        });
803
804        let received = read_eom_message(&mut client, None).await.unwrap();
805        assert_eq!(received, msg);
806    }
807
808    #[tokio::test]
809    async fn eom_helper_size_limit() {
810        let (mut client, mut server) = tokio::io::duplex(4096);
811
812        let msg = "x".repeat(1000);
813        tokio::spawn(async move {
814            write_eom_message(&mut server, &msg).await.unwrap();
815        });
816
817        let result = read_eom_message(&mut client, Some(10)).await;
818        assert!(result.is_err());
819    }
820
821    // ── extract_message_id_from_bytes tests ─────────────────────────────
822
823    #[test]
824    fn extract_message_id_basic() {
825        let xml =
826            b"<rpc-reply message-id=\"42\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
827        assert_eq!(extract_message_id_from_bytes(xml), Some(42));
828    }
829
830    #[test]
831    fn extract_message_id_large_id() {
832        let xml = b"<rpc-reply message-id=\"4294967295\">";
833        assert_eq!(extract_message_id_from_bytes(xml), Some(4294967295));
834    }
835
836    #[test]
837    fn extract_message_id_with_xml_decl() {
838        let xml = b"<?xml version=\"1.0\"?><rpc-reply message-id=\"7\">";
839        assert_eq!(extract_message_id_from_bytes(xml), Some(7));
840    }
841
842    #[test]
843    fn extract_message_id_missing() {
844        let xml = b"<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">";
845        assert_eq!(extract_message_id_from_bytes(xml), None);
846    }
847
848    #[test]
849    fn extract_message_id_non_numeric() {
850        let xml = b"<rpc-reply message-id=\"abc\">";
851        assert_eq!(extract_message_id_from_bytes(xml), None);
852    }
853
854    #[test]
855    fn extract_message_id_empty_bytes() {
856        assert_eq!(extract_message_id_from_bytes(b""), None);
857    }
858
859    #[test]
860    fn extract_message_id_partial_header() {
861        // Only have the start of the attribute, no closing quote
862        let xml = b"<rpc-reply message-id=\"12";
863        assert_eq!(extract_message_id_from_bytes(xml), None);
864    }
865
866    // ── Chunk-level decode tests (new) ──────────────────────────────────
867
868    #[test]
869    fn eom_incremental_arrival() {
870        // Simulate data arriving in small pieces.
871        let mut codec = NetconfCodec::new(FramingMode::EndOfMessage, CodecConfig::default());
872        let mut buf = BytesMut::new();
873
874        // First batch: just a few bytes (within holdback)
875        buf.extend_from_slice(b"<ok");
876        assert_eq!(codec.decode(&mut buf).unwrap(), None);
877
878        // Second batch: more data, now exceeds holdback
879        buf.extend_from_slice(b"/>");
880        // buf = b"<ok/>" (5 bytes), exactly holdback — still None
881        assert_eq!(codec.decode(&mut buf).unwrap(), None);
882
883        // Third batch: data + marker
884        buf.extend_from_slice(b"]]>]]>");
885        // buf = b"<ok/>]]>]]>" — marker found at pos 5
886        assert_eq!(
887            codec.decode(&mut buf).unwrap(),
888            Some(DecodedFrame::Chunk(Bytes::from_static(b"<ok/>")))
889        );
890        assert_eq!(
891            codec.decode(&mut buf).unwrap(),
892            Some(DecodedFrame::EndOfMessage)
893        );
894    }
895
896    #[test]
897    fn chunked_two_messages_interleaved() {
898        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
899        let mut buf = BytesMut::new();
900        buf.extend_from_slice(b"\n#3\naaa\n##\n\n#3\nbbb\n##\n");
901
902        // First message
903        assert_eq!(
904            codec.decode(&mut buf).unwrap(),
905            Some(DecodedFrame::Chunk(Bytes::from_static(b"aaa")))
906        );
907        assert_eq!(
908            codec.decode(&mut buf).unwrap(),
909            Some(DecodedFrame::EndOfMessage)
910        );
911
912        // Second message
913        assert_eq!(
914            codec.decode(&mut buf).unwrap(),
915            Some(DecodedFrame::Chunk(Bytes::from_static(b"bbb")))
916        );
917        assert_eq!(
918            codec.decode(&mut buf).unwrap(),
919            Some(DecodedFrame::EndOfMessage)
920        );
921    }
922
923    #[test]
924    fn chunked_three_chunks_in_one_message() {
925        let mut codec = NetconfCodec::new(FramingMode::Chunked, CodecConfig::default());
926        let mut buf = BytesMut::from(&b"\n#1\na\n#1\nb\n#1\nc\n##\n"[..]);
927
928        assert_eq!(
929            codec.decode(&mut buf).unwrap(),
930            Some(DecodedFrame::Chunk(Bytes::from_static(b"a")))
931        );
932        assert_eq!(
933            codec.decode(&mut buf).unwrap(),
934            Some(DecodedFrame::Chunk(Bytes::from_static(b"b")))
935        );
936        assert_eq!(
937            codec.decode(&mut buf).unwrap(),
938            Some(DecodedFrame::Chunk(Bytes::from_static(b"c")))
939        );
940        assert_eq!(
941            codec.decode(&mut buf).unwrap(),
942            Some(DecodedFrame::EndOfMessage)
943        );
944    }
945}