async_snmp/client/
walk.rs

1//! Walk stream implementations.
2
3#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20/// Walk operation mode.
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24    /// Auto-select based on version (default).
25    /// V1 uses GETNEXT, V2c/V3 uses GETBULK.
26    #[default]
27    Auto,
28    /// Always use GETNEXT (slower but more compatible).
29    GetNext,
30    /// Always use GETBULK (faster, errors on v1).
31    GetBulk,
32}
33
34/// OID ordering behavior during walk operations.
35///
36/// SNMP walks rely on agents returning OIDs in strictly increasing
37/// lexicographic order. However, some buggy agents violate this requirement,
38/// returning OIDs out of order or even repeating OIDs (which would cause
39/// infinite loops).
40///
41/// This enum controls how the library handles ordering violations:
42///
43/// - [`Strict`](Self::Strict) (default): Terminates immediately with
44///   [`Error::NonIncreasingOid`](crate::Error::NonIncreasingOid) on any violation.
45///   Use this unless you know the agent has ordering bugs.
46///
47/// - [`AllowNonIncreasing`](Self::AllowNonIncreasing): Tolerates out-of-order
48///   OIDs but tracks all seen OIDs to detect cycles. Returns
49///   [`Error::DuplicateOid`](crate::Error::DuplicateOid) if the same OID appears twice.
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub enum OidOrdering {
53    /// Require strictly increasing OIDs (default).
54    ///
55    /// Walk terminates with [`Error::NonIncreasingOid`](crate::Error::NonIncreasingOid)
56    /// on first violation. Most efficient: O(1) memory, O(1) per-item check.
57    #[default]
58    Strict,
59
60    /// Allow non-increasing OIDs, with cycle detection.
61    ///
62    /// Some buggy agents return OIDs out of order. This mode tracks all seen
63    /// OIDs in a HashSet to detect cycles, terminating with an error if the
64    /// same OID is returned twice.
65    ///
66    /// **Warning**: This uses O(n) memory where n = number of walk results.
67    /// Always pair with [`ClientBuilder::max_walk_results`] to bound memory
68    /// usage. Cycle detection only catches duplicate OIDs; a pathological
69    /// agent could still return an infinite sequence of unique OIDs within
70    /// the subtree.
71    ///
72    /// [`ClientBuilder::max_walk_results`]: crate::ClientBuilder::max_walk_results
73    AllowNonIncreasing,
74}
75
76/// Internal OID tracking for walk operations.
77///
78/// This enum implements two strategies for detecting walk termination
79/// conditions due to agent misbehavior:
80/// - `Strict`: O(1) memory, compares against previous OID
81/// - `Relaxed`: O(n) memory, tracks all seen OIDs in a HashSet
82enum OidTracker {
83    /// O(1) memory: stores only the previous OID for comparison.
84    /// Used by default Strict mode.
85    Strict { last: Option<Oid> },
86
87    /// O(n) memory: HashSet of all seen OIDs for cycle detection.
88    /// Only allocated when AllowNonIncreasing is configured.
89    Relaxed { seen: HashSet<Oid> },
90}
91
92impl OidTracker {
93    fn new(ordering: OidOrdering) -> Self {
94        match ordering {
95            OidOrdering::Strict => OidTracker::Strict { last: None },
96            OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
97                seen: HashSet::new(),
98            },
99        }
100    }
101
102    /// Check if OID is valid according to ordering rules.
103    /// Returns Ok(()) if valid, Err if violation detected.
104    fn check(&mut self, oid: &Oid) -> Result<()> {
105        match self {
106            OidTracker::Strict { last } => {
107                if let Some(prev) = last
108                    && oid <= prev
109                {
110                    return Err(Error::NonIncreasingOid {
111                        previous: prev.clone(),
112                        current: oid.clone(),
113                    });
114                }
115                *last = Some(oid.clone());
116                Ok(())
117            }
118            OidTracker::Relaxed { seen } => {
119                if !seen.insert(oid.clone()) {
120                    return Err(Error::DuplicateOid { oid: oid.clone() });
121                }
122                Ok(())
123            }
124        }
125    }
126}
127
128/// Async stream for walking an OID subtree using GETNEXT.
129///
130/// Created by [`Client::walk_getnext()`].
131pub struct Walk<T: Transport> {
132    client: Client<T>,
133    base_oid: Oid,
134    current_oid: Oid,
135    /// OID tracker for ordering validation.
136    oid_tracker: OidTracker,
137    /// Maximum number of results to return (None = unlimited).
138    max_results: Option<usize>,
139    /// Count of results returned so far.
140    count: usize,
141    done: bool,
142    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
143}
144
145impl<T: Transport> Walk<T> {
146    pub(crate) fn new(
147        client: Client<T>,
148        oid: Oid,
149        ordering: OidOrdering,
150        max_results: Option<usize>,
151    ) -> Self {
152        Self {
153            client,
154            base_oid: oid.clone(),
155            current_oid: oid,
156            oid_tracker: OidTracker::new(ordering),
157            max_results,
158            count: 0,
159            done: false,
160            pending: None,
161        }
162    }
163}
164
165impl<T: Transport + 'static> Walk<T> {
166    /// Get the next varbind, or None when complete.
167    pub async fn next(&mut self) -> Option<Result<VarBind>> {
168        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
169    }
170
171    /// Collect all remaining varbinds.
172    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
173        let mut results = Vec::new();
174        while let Some(result) = self.next().await {
175            results.push(result?);
176        }
177        Ok(results)
178    }
179}
180
181impl<T: Transport + 'static> Stream for Walk<T> {
182    type Item = Result<VarBind>;
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        if self.done {
186            return Poll::Ready(None);
187        }
188
189        // Check max_results limit
190        if let Some(max) = self.max_results
191            && self.count >= max
192        {
193            self.done = true;
194            return Poll::Ready(None);
195        }
196
197        // Check if we have a pending request
198        if self.pending.is_none() {
199            // Start a new GETNEXT request
200            let client = self.client.clone();
201            let oid = self.current_oid.clone();
202
203            let fut = Box::pin(async move { client.get_next(&oid).await });
204            self.pending = Some(fut);
205        }
206
207        // Poll the pending future
208        let pending = self.pending.as_mut().unwrap();
209        match pending.as_mut().poll(cx) {
210            Poll::Pending => Poll::Pending,
211            Poll::Ready(result) => {
212                self.pending = None;
213
214                match result {
215                    Ok(vb) => {
216                        // Check for end conditions
217                        if matches!(vb.value, Value::EndOfMibView) {
218                            self.done = true;
219                            return Poll::Ready(None);
220                        }
221
222                        // Check if OID left the subtree
223                        if !vb.oid.starts_with(&self.base_oid) {
224                            self.done = true;
225                            return Poll::Ready(None);
226                        }
227
228                        // Check OID ordering using the tracker
229                        if let Err(e) = self.oid_tracker.check(&vb.oid) {
230                            self.done = true;
231                            return Poll::Ready(Some(Err(e)));
232                        }
233
234                        // Update current OID for next iteration
235                        self.current_oid = vb.oid.clone();
236                        self.count += 1;
237
238                        Poll::Ready(Some(Ok(vb)))
239                    }
240                    Err(e) => {
241                        self.done = true;
242                        Poll::Ready(Some(Err(e)))
243                    }
244                }
245            }
246        }
247    }
248}
249
250/// Async stream for walking an OID subtree using GETBULK.
251///
252/// Created by [`Client::bulk_walk()`].
253pub struct BulkWalk<T: Transport> {
254    client: Client<T>,
255    base_oid: Oid,
256    current_oid: Oid,
257    max_repetitions: i32,
258    /// OID tracker for ordering validation.
259    oid_tracker: OidTracker,
260    /// Maximum number of results to return (None = unlimited).
261    max_results: Option<usize>,
262    /// Count of results returned so far.
263    count: usize,
264    done: bool,
265    /// Buffered results from the last GETBULK response
266    buffer: Vec<VarBind>,
267    /// Index into the buffer
268    buffer_idx: usize,
269    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
270}
271
272impl<T: Transport> BulkWalk<T> {
273    pub(crate) fn new(
274        client: Client<T>,
275        oid: Oid,
276        max_repetitions: i32,
277        ordering: OidOrdering,
278        max_results: Option<usize>,
279    ) -> Self {
280        Self {
281            client,
282            base_oid: oid.clone(),
283            current_oid: oid,
284            max_repetitions,
285            oid_tracker: OidTracker::new(ordering),
286            max_results,
287            count: 0,
288            done: false,
289            buffer: Vec::new(),
290            buffer_idx: 0,
291            pending: None,
292        }
293    }
294}
295
296impl<T: Transport + 'static> BulkWalk<T> {
297    /// Get the next varbind, or None when complete.
298    pub async fn next(&mut self) -> Option<Result<VarBind>> {
299        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
300    }
301
302    /// Collect all remaining varbinds.
303    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
304        let mut results = Vec::new();
305        while let Some(result) = self.next().await {
306            results.push(result?);
307        }
308        Ok(results)
309    }
310}
311
312impl<T: Transport + 'static> Stream for BulkWalk<T> {
313    type Item = Result<VarBind>;
314
315    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        loop {
317            if self.done {
318                return Poll::Ready(None);
319            }
320
321            // Check max_results limit
322            if let Some(max) = self.max_results
323                && self.count >= max
324            {
325                self.done = true;
326                return Poll::Ready(None);
327            }
328
329            // Check if we have buffered results to return
330            if self.buffer_idx < self.buffer.len() {
331                let vb = self.buffer[self.buffer_idx].clone();
332                self.buffer_idx += 1;
333
334                // Check for end conditions
335                if matches!(vb.value, Value::EndOfMibView) {
336                    self.done = true;
337                    return Poll::Ready(None);
338                }
339
340                // Check if OID left the subtree
341                if !vb.oid.starts_with(&self.base_oid) {
342                    self.done = true;
343                    return Poll::Ready(None);
344                }
345
346                // Check OID ordering using the tracker
347                if let Err(e) = self.oid_tracker.check(&vb.oid) {
348                    self.done = true;
349                    return Poll::Ready(Some(Err(e)));
350                }
351
352                // Update current OID for next request
353                self.current_oid = vb.oid.clone();
354                self.count += 1;
355
356                return Poll::Ready(Some(Ok(vb)));
357            }
358
359            // Buffer exhausted, need to fetch more
360            if self.pending.is_none() {
361                let client = self.client.clone();
362                let oid = self.current_oid.clone();
363                let max_rep = self.max_repetitions;
364
365                let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
366                self.pending = Some(fut);
367            }
368
369            // Poll the pending future
370            let pending = self.pending.as_mut().unwrap();
371            match pending.as_mut().poll(cx) {
372                Poll::Pending => return Poll::Pending,
373                Poll::Ready(result) => {
374                    self.pending = None;
375
376                    match result {
377                        Ok(varbinds) => {
378                            if varbinds.is_empty() {
379                                self.done = true;
380                                return Poll::Ready(None);
381                            }
382
383                            self.buffer = varbinds;
384                            self.buffer_idx = 0;
385                            // Continue loop to process buffer
386                        }
387                        Err(e) => {
388                            self.done = true;
389                            return Poll::Ready(Some(Err(e)));
390                        }
391                    }
392                }
393            }
394        }
395    }
396}
397
398// ============================================================================
399// Unified WalkStream - auto-selects GETNEXT or GETBULK based on WalkMode
400// ============================================================================
401
402/// Unified walk stream that auto-selects between GETNEXT and GETBULK.
403///
404/// Created by [`Client::walk()`] when using `WalkMode::Auto` or explicit mode selection.
405/// This type wraps either a [`Walk`] or [`BulkWalk`] internally based on:
406/// - `WalkMode::Auto`: Uses GETNEXT for V1, GETBULK for V2c/V3
407/// - `WalkMode::GetNext`: Always uses GETNEXT
408/// - `WalkMode::GetBulk`: Always uses GETBULK (fails on V1)
409pub enum WalkStream<T: Transport> {
410    /// GETNEXT-based walk (used for V1 or when explicitly requested)
411    GetNext(Walk<T>),
412    /// GETBULK-based walk (used for V2c/V3 or when explicitly requested)
413    GetBulk(BulkWalk<T>),
414}
415
416impl<T: Transport> WalkStream<T> {
417    /// Create a new walk stream with auto-selection based on version and walk mode.
418    pub(crate) fn new(
419        client: Client<T>,
420        oid: Oid,
421        version: Version,
422        walk_mode: WalkMode,
423        ordering: OidOrdering,
424        max_results: Option<usize>,
425        max_repetitions: i32,
426    ) -> Result<Self> {
427        let use_bulk = match walk_mode {
428            WalkMode::Auto => version != Version::V1,
429            WalkMode::GetNext => false,
430            WalkMode::GetBulk => {
431                if version == Version::V1 {
432                    return Err(Error::GetBulkNotSupportedInV1);
433                }
434                true
435            }
436        };
437
438        Ok(if use_bulk {
439            WalkStream::GetBulk(BulkWalk::new(
440                client,
441                oid,
442                max_repetitions,
443                ordering,
444                max_results,
445            ))
446        } else {
447            WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
448        })
449    }
450}
451
452impl<T: Transport + 'static> WalkStream<T> {
453    /// Get the next varbind, or None when complete.
454    pub async fn next(&mut self) -> Option<Result<VarBind>> {
455        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
456    }
457
458    /// Collect all remaining varbinds.
459    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
460        let mut results = Vec::new();
461        while let Some(result) = self.next().await {
462            results.push(result?);
463        }
464        Ok(results)
465    }
466}
467
468impl<T: Transport + 'static> Stream for WalkStream<T> {
469    type Item = Result<VarBind>;
470
471    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472        // SAFETY: We're just projecting the pin to the inner enum variant
473        match self.get_mut() {
474            WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
475            WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::transport::{MockTransport, ResponseBuilder};
484    use crate::{ClientConfig, Version};
485    use bytes::Bytes;
486    use futures_core::Stream;
487    use std::pin::Pin;
488    use std::task::Context;
489    use std::time::Duration;
490
491    fn mock_client(mock: MockTransport) -> Client<MockTransport> {
492        let config = ClientConfig {
493            version: Version::V2c,
494            community: Bytes::from_static(b"public"),
495            timeout: Duration::from_secs(1),
496            retries: 0,
497            max_oids_per_request: 10,
498            v3_security: None,
499            walk_mode: WalkMode::Auto,
500            oid_ordering: OidOrdering::Strict,
501            max_walk_results: None,
502            max_repetitions: 25,
503        };
504        Client::new(mock, config)
505    }
506
507    async fn collect_walk<T: Transport + 'static>(
508        mut walk: Pin<&mut Walk<T>>,
509        limit: usize,
510    ) -> Vec<Result<VarBind>> {
511        use std::future::poll_fn;
512
513        let mut results = Vec::new();
514        while results.len() < limit {
515            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
516            match item {
517                Some(result) => results.push(result),
518                None => break,
519            }
520        }
521        results
522    }
523
524    async fn collect_bulk_walk<T: Transport + 'static>(
525        mut walk: Pin<&mut BulkWalk<T>>,
526        limit: usize,
527    ) -> Vec<Result<VarBind>> {
528        use std::future::poll_fn;
529
530        let mut results = Vec::new();
531        while results.len() < limit {
532            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
533            match item {
534                Some(result) => results.push(result),
535                None => break,
536            }
537        }
538        results
539    }
540
541    #[tokio::test]
542    async fn test_walk_terminates_on_end_of_mib_view() {
543        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
544
545        // First response: valid OID in subtree
546        mock.queue_response(
547            ResponseBuilder::new(1)
548                .varbind(
549                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
550                    Value::OctetString("test".into()),
551                )
552                .build_v2c(b"public"),
553        );
554
555        // Second response: EndOfMibView
556        mock.queue_response(
557            ResponseBuilder::new(2)
558                .varbind(
559                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
560                    Value::EndOfMibView,
561                )
562                .build_v2c(b"public"),
563        );
564
565        let client = mock_client(mock);
566        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
567
568        let mut pinned = Box::pin(walk);
569        let results = collect_walk(pinned.as_mut(), 10).await;
570
571        assert_eq!(results.len(), 1);
572        assert!(results[0].is_ok());
573    }
574
575    #[tokio::test]
576    async fn test_walk_terminates_when_leaving_subtree() {
577        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
578
579        // Response with OID outside the walked subtree (interfaces, not system)
580        mock.queue_response(
581            ResponseBuilder::new(1)
582                .varbind(
583                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces subtree
584                    Value::Integer(1),
585                )
586                .build_v2c(b"public"),
587        );
588
589        let client = mock_client(mock);
590        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); // system subtree
591
592        let mut pinned = Box::pin(walk);
593        let results = collect_walk(pinned.as_mut(), 10).await;
594
595        // Should terminate immediately with no results
596        assert_eq!(results.len(), 0);
597    }
598
599    #[tokio::test]
600    async fn test_walk_returns_oids_in_sequence() {
601        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
602
603        // Queue three responses in lexicographic order
604        mock.queue_response(
605            ResponseBuilder::new(1)
606                .varbind(
607                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
608                    Value::OctetString("desc".into()),
609                )
610                .build_v2c(b"public"),
611        );
612        mock.queue_response(
613            ResponseBuilder::new(2)
614                .varbind(
615                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
616                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
617                )
618                .build_v2c(b"public"),
619        );
620        mock.queue_response(
621            ResponseBuilder::new(3)
622                .varbind(
623                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
624                    Value::TimeTicks(12345),
625                )
626                .build_v2c(b"public"),
627        );
628        // Fourth response leaves subtree
629        mock.queue_response(
630            ResponseBuilder::new(4)
631                .varbind(
632                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
633                    Value::Integer(1),
634                )
635                .build_v2c(b"public"),
636        );
637
638        let client = mock_client(mock);
639        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
640
641        let mut pinned = Box::pin(walk);
642        let results = collect_walk(pinned.as_mut(), 10).await;
643
644        assert_eq!(results.len(), 3);
645
646        // Verify lexicographic ordering
647        let oids: Vec<_> = results
648            .iter()
649            .filter_map(|r| r.as_ref().ok())
650            .map(|vb| &vb.oid)
651            .collect();
652        for i in 1..oids.len() {
653            assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
654        }
655    }
656
657    #[tokio::test]
658    async fn test_walk_propagates_errors() {
659        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
660
661        // First response succeeds
662        mock.queue_response(
663            ResponseBuilder::new(1)
664                .varbind(
665                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
666                    Value::OctetString("test".into()),
667                )
668                .build_v2c(b"public"),
669        );
670
671        // Second request times out
672        mock.queue_timeout();
673
674        let client = mock_client(mock);
675        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
676
677        let mut pinned = Box::pin(walk);
678        let results = collect_walk(pinned.as_mut(), 10).await;
679
680        assert_eq!(results.len(), 2);
681        assert!(results[0].is_ok());
682        assert!(results[1].is_err());
683    }
684
685    #[tokio::test]
686    async fn test_bulk_walk_terminates_on_end_of_mib_view() {
687        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
688
689        // GETBULK response with multiple varbinds, last one is EndOfMibView
690        mock.queue_response(
691            ResponseBuilder::new(1)
692                .varbind(
693                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
694                    Value::OctetString("desc".into()),
695                )
696                .varbind(
697                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
698                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
699                )
700                .varbind(
701                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
702                    Value::EndOfMibView,
703                )
704                .build_v2c(b"public"),
705        );
706
707        let client = mock_client(mock);
708        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
709
710        let mut pinned = Box::pin(walk);
711        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
712
713        // Should return 2 valid results before EndOfMibView terminates
714        assert_eq!(results.len(), 2);
715        assert!(results.iter().all(|r| r.is_ok()));
716    }
717
718    #[tokio::test]
719    async fn test_bulk_walk_terminates_when_leaving_subtree() {
720        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
721
722        // GETBULK returns varbinds, some in subtree, one outside
723        mock.queue_response(
724            ResponseBuilder::new(1)
725                .varbind(
726                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
727                    Value::OctetString("desc".into()),
728                )
729                .varbind(
730                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
731                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
732                )
733                .varbind(
734                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces - outside system
735                    Value::Integer(1),
736                )
737                .build_v2c(b"public"),
738        );
739
740        let client = mock_client(mock);
741        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
742
743        let mut pinned = Box::pin(walk);
744        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
745
746        // Should return 2 results (third OID is outside subtree)
747        assert_eq!(results.len(), 2);
748    }
749
750    #[tokio::test]
751    async fn test_bulk_walk_handles_empty_response() {
752        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
753
754        // Empty GETBULK response (no varbinds)
755        mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
756
757        let client = mock_client(mock);
758        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
759
760        let mut pinned = Box::pin(walk);
761        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
762
763        // Should return empty
764        assert_eq!(results.len(), 0);
765    }
766
767    // Tests for non-increasing OID detection.
768    // These prevent infinite loops on non-conformant SNMP agents.
769
770    #[tokio::test]
771    async fn test_walk_errors_on_decreasing_oid() {
772        use crate::error::Error;
773
774        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
775
776        // First response: .1.3.6.1.2.1.1.5.0
777        mock.queue_response(
778            ResponseBuilder::new(1)
779                .varbind(
780                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
781                    Value::OctetString("host1".into()),
782                )
783                .build_v2c(b"public"),
784        );
785
786        // Second response: .1.3.6.1.2.1.1.4.0 (DECREASING - goes backwards!)
787        mock.queue_response(
788            ResponseBuilder::new(2)
789                .varbind(
790                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
791                    Value::OctetString("admin".into()),
792                )
793                .build_v2c(b"public"),
794        );
795
796        let client = mock_client(mock);
797        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
798
799        let mut pinned = Box::pin(walk);
800        let results = collect_walk(pinned.as_mut(), 10).await;
801
802        // Should get first result OK, then error on second
803        assert_eq!(results.len(), 2);
804        assert!(results[0].is_ok());
805        assert!(matches!(
806            &results[1],
807            Err(Error::NonIncreasingOid { previous, current })
808            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
809               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
810        ));
811    }
812
813    #[tokio::test]
814    async fn test_walk_errors_on_same_oid_returned_twice() {
815        use crate::error::Error;
816
817        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
818
819        // First response: .1.3.6.1.2.1.1.1.0
820        mock.queue_response(
821            ResponseBuilder::new(1)
822                .varbind(
823                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
824                    Value::OctetString("desc".into()),
825                )
826                .build_v2c(b"public"),
827        );
828
829        // Second response: same OID again! (would cause infinite loop)
830        mock.queue_response(
831            ResponseBuilder::new(2)
832                .varbind(
833                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
834                    Value::OctetString("desc".into()),
835                )
836                .build_v2c(b"public"),
837        );
838
839        let client = mock_client(mock);
840        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
841
842        let mut pinned = Box::pin(walk);
843        let results = collect_walk(pinned.as_mut(), 10).await;
844
845        // Should get first result OK, then error on second
846        assert_eq!(results.len(), 2);
847        assert!(results[0].is_ok());
848        assert!(matches!(
849            &results[1],
850            Err(Error::NonIncreasingOid { previous, current })
851            if previous == current
852        ));
853    }
854
855    #[tokio::test]
856    async fn test_bulk_walk_errors_on_non_increasing_oid() {
857        use crate::error::Error;
858
859        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
860
861        // First GETBULK response with non-increasing OID in the batch
862        mock.queue_response(
863            ResponseBuilder::new(1)
864                .varbind(
865                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
866                    Value::OctetString("desc".into()),
867                )
868                .varbind(
869                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
870                    Value::TimeTicks(12345),
871                )
872                .varbind(
873                    // Non-increasing: .1.2.0 < .3.0 (goes backwards)
874                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
875                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
876                )
877                .build_v2c(b"public"),
878        );
879
880        let client = mock_client(mock);
881        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
882
883        let mut pinned = Box::pin(walk);
884        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
885
886        // Should get first two results OK, then error on third
887        assert_eq!(results.len(), 3);
888        assert!(results[0].is_ok());
889        assert!(results[1].is_ok());
890        assert!(matches!(
891            &results[2],
892            Err(Error::NonIncreasingOid { previous, current })
893            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
894               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
895        ));
896    }
897
898    // Tests for AllowNonIncreasing OID ordering mode
899
900    #[tokio::test]
901    async fn test_walk_allow_non_increasing_accepts_out_of_order() {
902        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
903
904        // Three OIDs out of order, but no duplicates
905        mock.queue_response(
906            ResponseBuilder::new(1)
907                .varbind(
908                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
909                    Value::OctetString("five".into()),
910                )
911                .build_v2c(b"public"),
912        );
913        mock.queue_response(
914            ResponseBuilder::new(2)
915                .varbind(
916                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), // out of order
917                    Value::OctetString("three".into()),
918                )
919                .build_v2c(b"public"),
920        );
921        mock.queue_response(
922            ResponseBuilder::new(3)
923                .varbind(
924                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
925                    Value::OctetString("seven".into()),
926                )
927                .build_v2c(b"public"),
928        );
929        // Fourth response leaves subtree
930        mock.queue_response(
931            ResponseBuilder::new(4)
932                .varbind(
933                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
934                    Value::Integer(1),
935                )
936                .build_v2c(b"public"),
937        );
938
939        let client = mock_client(mock);
940        let walk = Walk::new(
941            client,
942            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
943            OidOrdering::AllowNonIncreasing,
944            None,
945        );
946
947        let mut pinned = Box::pin(walk);
948        let results = collect_walk(pinned.as_mut(), 10).await;
949
950        // Should get all three results successfully
951        assert_eq!(results.len(), 3);
952        assert!(results.iter().all(|r| r.is_ok()));
953    }
954
955    #[tokio::test]
956    async fn test_walk_allow_non_increasing_detects_cycle() {
957        use crate::error::Error;
958
959        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
960
961        // First OID
962        mock.queue_response(
963            ResponseBuilder::new(1)
964                .varbind(
965                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
966                    Value::OctetString("first".into()),
967                )
968                .build_v2c(b"public"),
969        );
970        // Second OID
971        mock.queue_response(
972            ResponseBuilder::new(2)
973                .varbind(
974                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
975                    Value::OctetString("second".into()),
976                )
977                .build_v2c(b"public"),
978        );
979        // Same as first - cycle!
980        mock.queue_response(
981            ResponseBuilder::new(3)
982                .varbind(
983                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
984                    Value::OctetString("first-again".into()),
985                )
986                .build_v2c(b"public"),
987        );
988
989        let client = mock_client(mock);
990        let walk = Walk::new(
991            client,
992            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
993            OidOrdering::AllowNonIncreasing,
994            None,
995        );
996
997        let mut pinned = Box::pin(walk);
998        let results = collect_walk(pinned.as_mut(), 10).await;
999
1000        // Should get first two OK, then DuplicateOid error
1001        assert_eq!(results.len(), 3);
1002        assert!(results[0].is_ok());
1003        assert!(results[1].is_ok());
1004        assert!(matches!(
1005            &results[2],
1006            Err(Error::DuplicateOid { oid })
1007            if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
1008        ));
1009    }
1010
1011    // Tests for max_results limit
1012
1013    #[tokio::test]
1014    async fn test_walk_respects_max_results() {
1015        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1016
1017        // Queue many responses
1018        for i in 1..=10 {
1019            mock.queue_response(
1020                ResponseBuilder::new(i)
1021                    .varbind(
1022                        Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
1023                        Value::Integer(i),
1024                    )
1025                    .build_v2c(b"public"),
1026            );
1027        }
1028
1029        let client = mock_client(mock);
1030        let walk = Walk::new(
1031            client,
1032            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1033            OidOrdering::Strict,
1034            Some(3), // Limit to 3 results
1035        );
1036
1037        let mut pinned = Box::pin(walk);
1038        let results = collect_walk(pinned.as_mut(), 20).await;
1039
1040        // Should stop after 3 results
1041        assert_eq!(results.len(), 3);
1042        assert!(results.iter().all(|r| r.is_ok()));
1043    }
1044
1045    #[tokio::test]
1046    async fn test_bulk_walk_respects_max_results() {
1047        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1048
1049        // GETBULK returns many varbinds at once
1050        mock.queue_response(
1051            ResponseBuilder::new(1)
1052                .varbind(
1053                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1054                    Value::Integer(1),
1055                )
1056                .varbind(
1057                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1058                    Value::Integer(2),
1059                )
1060                .varbind(
1061                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1062                    Value::Integer(3),
1063                )
1064                .varbind(
1065                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1066                    Value::Integer(4),
1067                )
1068                .varbind(
1069                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1070                    Value::Integer(5),
1071                )
1072                .build_v2c(b"public"),
1073        );
1074
1075        let client = mock_client(mock);
1076        let walk = BulkWalk::new(
1077            client,
1078            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1079            10,
1080            OidOrdering::Strict,
1081            Some(3), // Limit to 3 results
1082        );
1083
1084        let mut pinned = Box::pin(walk);
1085        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1086
1087        // Should stop after 3 results even though buffer has more
1088        assert_eq!(results.len(), 3);
1089        assert!(results.iter().all(|r| r.is_ok()));
1090    }
1091
1092    // Tests for inherent next() and collect() methods
1093
1094    #[tokio::test]
1095    async fn test_walk_inherent_next() {
1096        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1097
1098        mock.queue_response(
1099            ResponseBuilder::new(1)
1100                .varbind(
1101                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1102                    Value::OctetString("test".into()),
1103                )
1104                .build_v2c(b"public"),
1105        );
1106        mock.queue_response(
1107            ResponseBuilder::new(2)
1108                .varbind(
1109                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1110                    Value::Integer(42),
1111                )
1112                .build_v2c(b"public"),
1113        );
1114        mock.queue_response(
1115            ResponseBuilder::new(3)
1116                .varbind(
1117                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1118                    Value::Integer(1),
1119                )
1120                .build_v2c(b"public"),
1121        );
1122
1123        let client = mock_client(mock);
1124        let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1125
1126        // Use inherent next() method
1127        let first = walk.next().await;
1128        assert!(first.is_some());
1129        assert!(first.unwrap().is_ok());
1130
1131        let second = walk.next().await;
1132        assert!(second.is_some());
1133        assert!(second.unwrap().is_ok());
1134
1135        let third = walk.next().await;
1136        assert!(third.is_none()); // Walk ended
1137    }
1138
1139    #[tokio::test]
1140    async fn test_walk_inherent_collect() {
1141        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1142
1143        mock.queue_response(
1144            ResponseBuilder::new(1)
1145                .varbind(
1146                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1147                    Value::OctetString("test".into()),
1148                )
1149                .build_v2c(b"public"),
1150        );
1151        mock.queue_response(
1152            ResponseBuilder::new(2)
1153                .varbind(
1154                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1155                    Value::Integer(42),
1156                )
1157                .build_v2c(b"public"),
1158        );
1159        mock.queue_response(
1160            ResponseBuilder::new(3)
1161                .varbind(
1162                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1163                    Value::Integer(1),
1164                )
1165                .build_v2c(b"public"),
1166        );
1167
1168        let client = mock_client(mock);
1169        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1170
1171        // Use inherent collect() method
1172        let results = walk.collect().await.unwrap();
1173        assert_eq!(results.len(), 2);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_bulk_walk_inherent_collect() {
1178        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1179
1180        mock.queue_response(
1181            ResponseBuilder::new(1)
1182                .varbind(
1183                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1184                    Value::OctetString("desc".into()),
1185                )
1186                .varbind(
1187                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1188                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1189                )
1190                .varbind(
1191                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // outside system
1192                    Value::Integer(1),
1193                )
1194                .build_v2c(b"public"),
1195        );
1196
1197        let client = mock_client(mock);
1198        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1199
1200        // Use inherent collect() method
1201        let results = walk.collect().await.unwrap();
1202        assert_eq!(results.len(), 2);
1203    }
1204}