async_snmp/client/
walk.rs

1//! Walk stream implementations.
2
3#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20/// Walk operation mode.
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24    /// Auto-select based on version (default).
25    /// V1 uses GETNEXT, V2c/V3 uses GETBULK.
26    #[default]
27    Auto,
28    /// Always use GETNEXT (slower but more compatible).
29    GetNext,
30    /// Always use GETBULK (faster, errors on v1).
31    GetBulk,
32}
33
34/// OID ordering behavior during walk operations.
35///
36/// SNMP walks rely on agents returning OIDs in strictly increasing
37/// lexicographic order. However, some buggy agents violate this requirement,
38/// returning OIDs out of order or even repeating OIDs (which would cause
39/// infinite loops).
40///
41/// This enum controls how the library handles ordering violations:
42///
43/// - [`Strict`](Self::Strict) (default): Terminates immediately with
44///   [`Error::NonIncreasingOid`](crate::Error::NonIncreasingOid) on any violation.
45///   Use this unless you know the agent has ordering bugs.
46///
47/// - [`AllowNonIncreasing`](Self::AllowNonIncreasing): Tolerates out-of-order
48///   OIDs but tracks all seen OIDs to detect cycles. Returns
49///   [`Error::DuplicateOid`](crate::Error::DuplicateOid) if the same OID appears twice.
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub enum OidOrdering {
53    /// Require strictly increasing OIDs (default).
54    ///
55    /// Walk terminates with [`Error::NonIncreasingOid`](crate::Error::NonIncreasingOid)
56    /// on first violation. Most efficient: O(1) memory, O(1) per-item check.
57    #[default]
58    Strict,
59
60    /// Allow non-increasing OIDs, with cycle detection.
61    ///
62    /// Some buggy agents return OIDs out of order. This mode tracks all seen
63    /// OIDs in a HashSet to detect cycles, terminating with an error if the
64    /// same OID is returned twice.
65    ///
66    /// **Warning**: This uses O(n) memory where n = number of walk results.
67    /// Always pair with [`ClientBuilder::max_walk_results`] to bound memory
68    /// usage. Cycle detection only catches duplicate OIDs; a pathological
69    /// agent could still return an infinite sequence of unique OIDs within
70    /// the subtree.
71    ///
72    /// [`ClientBuilder::max_walk_results`]: crate::ClientBuilder::max_walk_results
73    AllowNonIncreasing,
74}
75
76/// Internal OID tracking for walk operations.
77///
78/// This enum implements two strategies for detecting walk termination
79/// conditions due to agent misbehavior:
80/// - `Strict`: O(1) memory, compares against previous OID
81/// - `Relaxed`: O(n) memory, tracks all seen OIDs in a HashSet
82enum OidTracker {
83    /// O(1) memory: stores only the previous OID for comparison.
84    /// Used by default Strict mode.
85    Strict { last: Option<Oid> },
86
87    /// O(n) memory: HashSet of all seen OIDs for cycle detection.
88    /// Only allocated when AllowNonIncreasing is configured.
89    Relaxed { seen: HashSet<Oid> },
90}
91
92impl OidTracker {
93    fn new(ordering: OidOrdering) -> Self {
94        match ordering {
95            OidOrdering::Strict => OidTracker::Strict { last: None },
96            OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
97                seen: HashSet::new(),
98            },
99        }
100    }
101
102    /// Check if OID is valid according to ordering rules.
103    /// Returns Ok(()) if valid, Err if violation detected.
104    fn check(&mut self, oid: &Oid) -> Result<()> {
105        match self {
106            OidTracker::Strict { last } => {
107                if let Some(prev) = last
108                    && oid <= prev
109                {
110                    return Err(Error::NonIncreasingOid {
111                        previous: prev.clone(),
112                        current: oid.clone(),
113                    });
114                }
115                *last = Some(oid.clone());
116                Ok(())
117            }
118            OidTracker::Relaxed { seen } => {
119                if !seen.insert(oid.clone()) {
120                    return Err(Error::DuplicateOid { oid: oid.clone() });
121                }
122                Ok(())
123            }
124        }
125    }
126}
127
128/// Async stream for walking an OID subtree using GETNEXT.
129///
130/// Created by [`Client::walk_getnext()`].
131pub struct Walk<T: Transport> {
132    client: Client<T>,
133    base_oid: Oid,
134    current_oid: Oid,
135    /// OID tracker for ordering validation.
136    oid_tracker: OidTracker,
137    /// Maximum number of results to return (None = unlimited).
138    max_results: Option<usize>,
139    /// Count of results returned so far.
140    count: usize,
141    done: bool,
142    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
143}
144
145impl<T: Transport> Walk<T> {
146    pub(crate) fn new(
147        client: Client<T>,
148        oid: Oid,
149        ordering: OidOrdering,
150        max_results: Option<usize>,
151    ) -> Self {
152        Self {
153            client,
154            base_oid: oid.clone(),
155            current_oid: oid,
156            oid_tracker: OidTracker::new(ordering),
157            max_results,
158            count: 0,
159            done: false,
160            pending: None,
161        }
162    }
163}
164
165impl<T: Transport + 'static> Walk<T> {
166    /// Get the next varbind, or None when complete.
167    pub async fn next(&mut self) -> Option<Result<VarBind>> {
168        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
169    }
170
171    /// Collect all remaining varbinds.
172    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
173        let mut results = Vec::new();
174        while let Some(result) = self.next().await {
175            results.push(result?);
176        }
177        Ok(results)
178    }
179}
180
181impl<T: Transport + 'static> Stream for Walk<T> {
182    type Item = Result<VarBind>;
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        if self.done {
186            return Poll::Ready(None);
187        }
188
189        // Check max_results limit
190        if let Some(max) = self.max_results
191            && self.count >= max
192        {
193            self.done = true;
194            return Poll::Ready(None);
195        }
196
197        // Check if we have a pending request
198        if self.pending.is_none() {
199            // Start a new GETNEXT request
200            let client = self.client.clone();
201            let oid = self.current_oid.clone();
202
203            let fut = Box::pin(async move { client.get_next(&oid).await });
204            self.pending = Some(fut);
205        }
206
207        // Poll the pending future
208        let pending = self.pending.as_mut().unwrap();
209        match pending.as_mut().poll(cx) {
210            Poll::Pending => Poll::Pending,
211            Poll::Ready(result) => {
212                self.pending = None;
213
214                match result {
215                    Ok(vb) => {
216                        // Check for end conditions
217                        if matches!(vb.value, Value::EndOfMibView) {
218                            self.done = true;
219                            return Poll::Ready(None);
220                        }
221
222                        // Check if OID left the subtree
223                        if !vb.oid.starts_with(&self.base_oid) {
224                            self.done = true;
225                            return Poll::Ready(None);
226                        }
227
228                        // Check OID ordering using the tracker
229                        if let Err(e) = self.oid_tracker.check(&vb.oid) {
230                            self.done = true;
231                            return Poll::Ready(Some(Err(e)));
232                        }
233
234                        // Update current OID for next iteration
235                        self.current_oid = vb.oid.clone();
236                        self.count += 1;
237
238                        Poll::Ready(Some(Ok(vb)))
239                    }
240                    Err(e) => {
241                        self.done = true;
242                        Poll::Ready(Some(Err(e)))
243                    }
244                }
245            }
246        }
247    }
248}
249
250/// Async stream for walking an OID subtree using GETBULK.
251///
252/// Created by [`Client::bulk_walk()`].
253pub struct BulkWalk<T: Transport> {
254    client: Client<T>,
255    base_oid: Oid,
256    current_oid: Oid,
257    max_repetitions: i32,
258    /// OID tracker for ordering validation.
259    oid_tracker: OidTracker,
260    /// Maximum number of results to return (None = unlimited).
261    max_results: Option<usize>,
262    /// Count of results returned so far.
263    count: usize,
264    done: bool,
265    /// Buffered results from the last GETBULK response
266    buffer: Vec<VarBind>,
267    /// Index into the buffer
268    buffer_idx: usize,
269    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
270}
271
272impl<T: Transport> BulkWalk<T> {
273    pub(crate) fn new(
274        client: Client<T>,
275        oid: Oid,
276        max_repetitions: i32,
277        ordering: OidOrdering,
278        max_results: Option<usize>,
279    ) -> Self {
280        Self {
281            client,
282            base_oid: oid.clone(),
283            current_oid: oid,
284            max_repetitions,
285            oid_tracker: OidTracker::new(ordering),
286            max_results,
287            count: 0,
288            done: false,
289            buffer: Vec::new(),
290            buffer_idx: 0,
291            pending: None,
292        }
293    }
294}
295
296impl<T: Transport + 'static> BulkWalk<T> {
297    /// Get the next varbind, or None when complete.
298    pub async fn next(&mut self) -> Option<Result<VarBind>> {
299        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
300    }
301
302    /// Collect all remaining varbinds.
303    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
304        let mut results = Vec::new();
305        while let Some(result) = self.next().await {
306            results.push(result?);
307        }
308        Ok(results)
309    }
310}
311
312impl<T: Transport + 'static> Stream for BulkWalk<T> {
313    type Item = Result<VarBind>;
314
315    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        loop {
317            if self.done {
318                return Poll::Ready(None);
319            }
320
321            // Check max_results limit
322            if let Some(max) = self.max_results
323                && self.count >= max
324            {
325                self.done = true;
326                return Poll::Ready(None);
327            }
328
329            // Check if we have buffered results to return
330            if self.buffer_idx < self.buffer.len() {
331                let vb = self.buffer[self.buffer_idx].clone();
332                self.buffer_idx += 1;
333
334                // Check for end conditions
335                if matches!(vb.value, Value::EndOfMibView) {
336                    self.done = true;
337                    return Poll::Ready(None);
338                }
339
340                // Check if OID left the subtree
341                if !vb.oid.starts_with(&self.base_oid) {
342                    self.done = true;
343                    return Poll::Ready(None);
344                }
345
346                // Check OID ordering using the tracker
347                if let Err(e) = self.oid_tracker.check(&vb.oid) {
348                    self.done = true;
349                    return Poll::Ready(Some(Err(e)));
350                }
351
352                // Update current OID for next request
353                self.current_oid = vb.oid.clone();
354                self.count += 1;
355
356                return Poll::Ready(Some(Ok(vb)));
357            }
358
359            // Buffer exhausted, need to fetch more
360            if self.pending.is_none() {
361                let client = self.client.clone();
362                let oid = self.current_oid.clone();
363                let max_rep = self.max_repetitions;
364
365                let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
366                self.pending = Some(fut);
367            }
368
369            // Poll the pending future
370            let pending = self.pending.as_mut().unwrap();
371            match pending.as_mut().poll(cx) {
372                Poll::Pending => return Poll::Pending,
373                Poll::Ready(result) => {
374                    self.pending = None;
375
376                    match result {
377                        Ok(varbinds) => {
378                            if varbinds.is_empty() {
379                                self.done = true;
380                                return Poll::Ready(None);
381                            }
382
383                            self.buffer = varbinds;
384                            self.buffer_idx = 0;
385                            // Continue loop to process buffer
386                        }
387                        Err(e) => {
388                            self.done = true;
389                            return Poll::Ready(Some(Err(e)));
390                        }
391                    }
392                }
393            }
394        }
395    }
396}
397
398// ============================================================================
399// Unified WalkStream - auto-selects GETNEXT or GETBULK based on WalkMode
400// ============================================================================
401
402/// Unified walk stream that auto-selects between GETNEXT and GETBULK.
403///
404/// Created by [`Client::walk()`] when using `WalkMode::Auto` or explicit mode selection.
405/// This type wraps either a [`Walk`] or [`BulkWalk`] internally based on:
406/// - `WalkMode::Auto`: Uses GETNEXT for V1, GETBULK for V2c/V3
407/// - `WalkMode::GetNext`: Always uses GETNEXT
408/// - `WalkMode::GetBulk`: Always uses GETBULK (fails on V1)
409pub enum WalkStream<T: Transport> {
410    /// GETNEXT-based walk (used for V1 or when explicitly requested)
411    GetNext(Walk<T>),
412    /// GETBULK-based walk (used for V2c/V3 or when explicitly requested)
413    GetBulk(BulkWalk<T>),
414}
415
416impl<T: Transport> WalkStream<T> {
417    /// Create a new walk stream with auto-selection based on version and walk mode.
418    pub(crate) fn new(
419        client: Client<T>,
420        oid: Oid,
421        version: Version,
422        walk_mode: WalkMode,
423        ordering: OidOrdering,
424        max_results: Option<usize>,
425        max_repetitions: i32,
426    ) -> Result<Self> {
427        let use_bulk = match walk_mode {
428            WalkMode::Auto => version != Version::V1,
429            WalkMode::GetNext => false,
430            WalkMode::GetBulk => {
431                if version == Version::V1 {
432                    return Err(Error::GetBulkNotSupportedInV1);
433                }
434                true
435            }
436        };
437
438        Ok(if use_bulk {
439            WalkStream::GetBulk(BulkWalk::new(
440                client,
441                oid,
442                max_repetitions,
443                ordering,
444                max_results,
445            ))
446        } else {
447            WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
448        })
449    }
450}
451
452impl<T: Transport + 'static> WalkStream<T> {
453    /// Get the next varbind, or None when complete.
454    pub async fn next(&mut self) -> Option<Result<VarBind>> {
455        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
456    }
457
458    /// Collect all remaining varbinds.
459    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
460        let mut results = Vec::new();
461        while let Some(result) = self.next().await {
462            results.push(result?);
463        }
464        Ok(results)
465    }
466}
467
468impl<T: Transport + 'static> Stream for WalkStream<T> {
469    type Item = Result<VarBind>;
470
471    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472        // SAFETY: We're just projecting the pin to the inner enum variant
473        match self.get_mut() {
474            WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
475            WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::client::retry::Retry;
484    use crate::transport::{MockTransport, ResponseBuilder};
485    use crate::{ClientConfig, Version};
486    use bytes::Bytes;
487    use futures_core::Stream;
488    use std::pin::Pin;
489    use std::task::Context;
490    use std::time::Duration;
491
492    fn mock_client(mock: MockTransport) -> Client<MockTransport> {
493        let config = ClientConfig {
494            version: Version::V2c,
495            community: Bytes::from_static(b"public"),
496            timeout: Duration::from_secs(1),
497            retry: Retry::none(),
498            max_oids_per_request: 10,
499            v3_security: None,
500            walk_mode: WalkMode::Auto,
501            oid_ordering: OidOrdering::Strict,
502            max_walk_results: None,
503            max_repetitions: 25,
504        };
505        Client::new(mock, config)
506    }
507
508    async fn collect_walk<T: Transport + 'static>(
509        mut walk: Pin<&mut Walk<T>>,
510        limit: usize,
511    ) -> Vec<Result<VarBind>> {
512        use std::future::poll_fn;
513
514        let mut results = Vec::new();
515        while results.len() < limit {
516            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
517            match item {
518                Some(result) => results.push(result),
519                None => break,
520            }
521        }
522        results
523    }
524
525    async fn collect_bulk_walk<T: Transport + 'static>(
526        mut walk: Pin<&mut BulkWalk<T>>,
527        limit: usize,
528    ) -> Vec<Result<VarBind>> {
529        use std::future::poll_fn;
530
531        let mut results = Vec::new();
532        while results.len() < limit {
533            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
534            match item {
535                Some(result) => results.push(result),
536                None => break,
537            }
538        }
539        results
540    }
541
542    #[tokio::test]
543    async fn test_walk_terminates_on_end_of_mib_view() {
544        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
545
546        // First response: valid OID in subtree
547        mock.queue_response(
548            ResponseBuilder::new(1)
549                .varbind(
550                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
551                    Value::OctetString("test".into()),
552                )
553                .build_v2c(b"public"),
554        );
555
556        // Second response: EndOfMibView
557        mock.queue_response(
558            ResponseBuilder::new(2)
559                .varbind(
560                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
561                    Value::EndOfMibView,
562                )
563                .build_v2c(b"public"),
564        );
565
566        let client = mock_client(mock);
567        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
568
569        let mut pinned = Box::pin(walk);
570        let results = collect_walk(pinned.as_mut(), 10).await;
571
572        assert_eq!(results.len(), 1);
573        assert!(results[0].is_ok());
574    }
575
576    #[tokio::test]
577    async fn test_walk_terminates_when_leaving_subtree() {
578        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
579
580        // Response with OID outside the walked subtree (interfaces, not system)
581        mock.queue_response(
582            ResponseBuilder::new(1)
583                .varbind(
584                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces subtree
585                    Value::Integer(1),
586                )
587                .build_v2c(b"public"),
588        );
589
590        let client = mock_client(mock);
591        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); // system subtree
592
593        let mut pinned = Box::pin(walk);
594        let results = collect_walk(pinned.as_mut(), 10).await;
595
596        // Should terminate immediately with no results
597        assert_eq!(results.len(), 0);
598    }
599
600    #[tokio::test]
601    async fn test_walk_returns_oids_in_sequence() {
602        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
603
604        // Queue three responses in lexicographic order
605        mock.queue_response(
606            ResponseBuilder::new(1)
607                .varbind(
608                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
609                    Value::OctetString("desc".into()),
610                )
611                .build_v2c(b"public"),
612        );
613        mock.queue_response(
614            ResponseBuilder::new(2)
615                .varbind(
616                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
617                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
618                )
619                .build_v2c(b"public"),
620        );
621        mock.queue_response(
622            ResponseBuilder::new(3)
623                .varbind(
624                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
625                    Value::TimeTicks(12345),
626                )
627                .build_v2c(b"public"),
628        );
629        // Fourth response leaves subtree
630        mock.queue_response(
631            ResponseBuilder::new(4)
632                .varbind(
633                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
634                    Value::Integer(1),
635                )
636                .build_v2c(b"public"),
637        );
638
639        let client = mock_client(mock);
640        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
641
642        let mut pinned = Box::pin(walk);
643        let results = collect_walk(pinned.as_mut(), 10).await;
644
645        assert_eq!(results.len(), 3);
646
647        // Verify lexicographic ordering
648        let oids: Vec<_> = results
649            .iter()
650            .filter_map(|r| r.as_ref().ok())
651            .map(|vb| &vb.oid)
652            .collect();
653        for i in 1..oids.len() {
654            assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
655        }
656    }
657
658    #[tokio::test]
659    async fn test_walk_propagates_errors() {
660        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
661
662        // First response succeeds
663        mock.queue_response(
664            ResponseBuilder::new(1)
665                .varbind(
666                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
667                    Value::OctetString("test".into()),
668                )
669                .build_v2c(b"public"),
670        );
671
672        // Second request times out
673        mock.queue_timeout();
674
675        let client = mock_client(mock);
676        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
677
678        let mut pinned = Box::pin(walk);
679        let results = collect_walk(pinned.as_mut(), 10).await;
680
681        assert_eq!(results.len(), 2);
682        assert!(results[0].is_ok());
683        assert!(results[1].is_err());
684    }
685
686    #[tokio::test]
687    async fn test_bulk_walk_terminates_on_end_of_mib_view() {
688        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
689
690        // GETBULK response with multiple varbinds, last one is EndOfMibView
691        mock.queue_response(
692            ResponseBuilder::new(1)
693                .varbind(
694                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
695                    Value::OctetString("desc".into()),
696                )
697                .varbind(
698                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
699                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
700                )
701                .varbind(
702                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
703                    Value::EndOfMibView,
704                )
705                .build_v2c(b"public"),
706        );
707
708        let client = mock_client(mock);
709        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
710
711        let mut pinned = Box::pin(walk);
712        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
713
714        // Should return 2 valid results before EndOfMibView terminates
715        assert_eq!(results.len(), 2);
716        assert!(results.iter().all(|r| r.is_ok()));
717    }
718
719    #[tokio::test]
720    async fn test_bulk_walk_terminates_when_leaving_subtree() {
721        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
722
723        // GETBULK returns varbinds, some in subtree, one outside
724        mock.queue_response(
725            ResponseBuilder::new(1)
726                .varbind(
727                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
728                    Value::OctetString("desc".into()),
729                )
730                .varbind(
731                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
732                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
733                )
734                .varbind(
735                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces - outside system
736                    Value::Integer(1),
737                )
738                .build_v2c(b"public"),
739        );
740
741        let client = mock_client(mock);
742        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
743
744        let mut pinned = Box::pin(walk);
745        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
746
747        // Should return 2 results (third OID is outside subtree)
748        assert_eq!(results.len(), 2);
749    }
750
751    #[tokio::test]
752    async fn test_bulk_walk_handles_empty_response() {
753        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
754
755        // Empty GETBULK response (no varbinds)
756        mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
757
758        let client = mock_client(mock);
759        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
760
761        let mut pinned = Box::pin(walk);
762        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
763
764        // Should return empty
765        assert_eq!(results.len(), 0);
766    }
767
768    // Tests for non-increasing OID detection.
769    // These prevent infinite loops on non-conformant SNMP agents.
770
771    #[tokio::test]
772    async fn test_walk_errors_on_decreasing_oid() {
773        use crate::error::Error;
774
775        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
776
777        // First response: .1.3.6.1.2.1.1.5.0
778        mock.queue_response(
779            ResponseBuilder::new(1)
780                .varbind(
781                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
782                    Value::OctetString("host1".into()),
783                )
784                .build_v2c(b"public"),
785        );
786
787        // Second response: .1.3.6.1.2.1.1.4.0 (DECREASING - goes backwards!)
788        mock.queue_response(
789            ResponseBuilder::new(2)
790                .varbind(
791                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
792                    Value::OctetString("admin".into()),
793                )
794                .build_v2c(b"public"),
795        );
796
797        let client = mock_client(mock);
798        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
799
800        let mut pinned = Box::pin(walk);
801        let results = collect_walk(pinned.as_mut(), 10).await;
802
803        // Should get first result OK, then error on second
804        assert_eq!(results.len(), 2);
805        assert!(results[0].is_ok());
806        assert!(matches!(
807            &results[1],
808            Err(Error::NonIncreasingOid { previous, current })
809            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
810               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
811        ));
812    }
813
814    #[tokio::test]
815    async fn test_walk_errors_on_same_oid_returned_twice() {
816        use crate::error::Error;
817
818        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
819
820        // First response: .1.3.6.1.2.1.1.1.0
821        mock.queue_response(
822            ResponseBuilder::new(1)
823                .varbind(
824                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
825                    Value::OctetString("desc".into()),
826                )
827                .build_v2c(b"public"),
828        );
829
830        // Second response: same OID again! (would cause infinite loop)
831        mock.queue_response(
832            ResponseBuilder::new(2)
833                .varbind(
834                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
835                    Value::OctetString("desc".into()),
836                )
837                .build_v2c(b"public"),
838        );
839
840        let client = mock_client(mock);
841        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
842
843        let mut pinned = Box::pin(walk);
844        let results = collect_walk(pinned.as_mut(), 10).await;
845
846        // Should get first result OK, then error on second
847        assert_eq!(results.len(), 2);
848        assert!(results[0].is_ok());
849        assert!(matches!(
850            &results[1],
851            Err(Error::NonIncreasingOid { previous, current })
852            if previous == current
853        ));
854    }
855
856    #[tokio::test]
857    async fn test_bulk_walk_errors_on_non_increasing_oid() {
858        use crate::error::Error;
859
860        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
861
862        // First GETBULK response with non-increasing OID in the batch
863        mock.queue_response(
864            ResponseBuilder::new(1)
865                .varbind(
866                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
867                    Value::OctetString("desc".into()),
868                )
869                .varbind(
870                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
871                    Value::TimeTicks(12345),
872                )
873                .varbind(
874                    // Non-increasing: .1.2.0 < .3.0 (goes backwards)
875                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
876                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
877                )
878                .build_v2c(b"public"),
879        );
880
881        let client = mock_client(mock);
882        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
883
884        let mut pinned = Box::pin(walk);
885        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
886
887        // Should get first two results OK, then error on third
888        assert_eq!(results.len(), 3);
889        assert!(results[0].is_ok());
890        assert!(results[1].is_ok());
891        assert!(matches!(
892            &results[2],
893            Err(Error::NonIncreasingOid { previous, current })
894            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
895               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
896        ));
897    }
898
899    // Tests for AllowNonIncreasing OID ordering mode
900
901    #[tokio::test]
902    async fn test_walk_allow_non_increasing_accepts_out_of_order() {
903        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
904
905        // Three OIDs out of order, but no duplicates
906        mock.queue_response(
907            ResponseBuilder::new(1)
908                .varbind(
909                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
910                    Value::OctetString("five".into()),
911                )
912                .build_v2c(b"public"),
913        );
914        mock.queue_response(
915            ResponseBuilder::new(2)
916                .varbind(
917                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), // out of order
918                    Value::OctetString("three".into()),
919                )
920                .build_v2c(b"public"),
921        );
922        mock.queue_response(
923            ResponseBuilder::new(3)
924                .varbind(
925                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
926                    Value::OctetString("seven".into()),
927                )
928                .build_v2c(b"public"),
929        );
930        // Fourth response leaves subtree
931        mock.queue_response(
932            ResponseBuilder::new(4)
933                .varbind(
934                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
935                    Value::Integer(1),
936                )
937                .build_v2c(b"public"),
938        );
939
940        let client = mock_client(mock);
941        let walk = Walk::new(
942            client,
943            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
944            OidOrdering::AllowNonIncreasing,
945            None,
946        );
947
948        let mut pinned = Box::pin(walk);
949        let results = collect_walk(pinned.as_mut(), 10).await;
950
951        // Should get all three results successfully
952        assert_eq!(results.len(), 3);
953        assert!(results.iter().all(|r| r.is_ok()));
954    }
955
956    #[tokio::test]
957    async fn test_walk_allow_non_increasing_detects_cycle() {
958        use crate::error::Error;
959
960        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
961
962        // First OID
963        mock.queue_response(
964            ResponseBuilder::new(1)
965                .varbind(
966                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
967                    Value::OctetString("first".into()),
968                )
969                .build_v2c(b"public"),
970        );
971        // Second OID
972        mock.queue_response(
973            ResponseBuilder::new(2)
974                .varbind(
975                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
976                    Value::OctetString("second".into()),
977                )
978                .build_v2c(b"public"),
979        );
980        // Same as first - cycle!
981        mock.queue_response(
982            ResponseBuilder::new(3)
983                .varbind(
984                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
985                    Value::OctetString("first-again".into()),
986                )
987                .build_v2c(b"public"),
988        );
989
990        let client = mock_client(mock);
991        let walk = Walk::new(
992            client,
993            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
994            OidOrdering::AllowNonIncreasing,
995            None,
996        );
997
998        let mut pinned = Box::pin(walk);
999        let results = collect_walk(pinned.as_mut(), 10).await;
1000
1001        // Should get first two OK, then DuplicateOid error
1002        assert_eq!(results.len(), 3);
1003        assert!(results[0].is_ok());
1004        assert!(results[1].is_ok());
1005        assert!(matches!(
1006            &results[2],
1007            Err(Error::DuplicateOid { oid })
1008            if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
1009        ));
1010    }
1011
1012    // Tests for max_results limit
1013
1014    #[tokio::test]
1015    async fn test_walk_respects_max_results() {
1016        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1017
1018        // Queue many responses
1019        for i in 1..=10 {
1020            mock.queue_response(
1021                ResponseBuilder::new(i)
1022                    .varbind(
1023                        Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
1024                        Value::Integer(i),
1025                    )
1026                    .build_v2c(b"public"),
1027            );
1028        }
1029
1030        let client = mock_client(mock);
1031        let walk = Walk::new(
1032            client,
1033            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1034            OidOrdering::Strict,
1035            Some(3), // Limit to 3 results
1036        );
1037
1038        let mut pinned = Box::pin(walk);
1039        let results = collect_walk(pinned.as_mut(), 20).await;
1040
1041        // Should stop after 3 results
1042        assert_eq!(results.len(), 3);
1043        assert!(results.iter().all(|r| r.is_ok()));
1044    }
1045
1046    #[tokio::test]
1047    async fn test_bulk_walk_respects_max_results() {
1048        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1049
1050        // GETBULK returns many varbinds at once
1051        mock.queue_response(
1052            ResponseBuilder::new(1)
1053                .varbind(
1054                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1055                    Value::Integer(1),
1056                )
1057                .varbind(
1058                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1059                    Value::Integer(2),
1060                )
1061                .varbind(
1062                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1063                    Value::Integer(3),
1064                )
1065                .varbind(
1066                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1067                    Value::Integer(4),
1068                )
1069                .varbind(
1070                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1071                    Value::Integer(5),
1072                )
1073                .build_v2c(b"public"),
1074        );
1075
1076        let client = mock_client(mock);
1077        let walk = BulkWalk::new(
1078            client,
1079            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1080            10,
1081            OidOrdering::Strict,
1082            Some(3), // Limit to 3 results
1083        );
1084
1085        let mut pinned = Box::pin(walk);
1086        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1087
1088        // Should stop after 3 results even though buffer has more
1089        assert_eq!(results.len(), 3);
1090        assert!(results.iter().all(|r| r.is_ok()));
1091    }
1092
1093    // Tests for inherent next() and collect() methods
1094
1095    #[tokio::test]
1096    async fn test_walk_inherent_next() {
1097        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1098
1099        mock.queue_response(
1100            ResponseBuilder::new(1)
1101                .varbind(
1102                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1103                    Value::OctetString("test".into()),
1104                )
1105                .build_v2c(b"public"),
1106        );
1107        mock.queue_response(
1108            ResponseBuilder::new(2)
1109                .varbind(
1110                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1111                    Value::Integer(42),
1112                )
1113                .build_v2c(b"public"),
1114        );
1115        mock.queue_response(
1116            ResponseBuilder::new(3)
1117                .varbind(
1118                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1119                    Value::Integer(1),
1120                )
1121                .build_v2c(b"public"),
1122        );
1123
1124        let client = mock_client(mock);
1125        let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1126
1127        // Use inherent next() method
1128        let first = walk.next().await;
1129        assert!(first.is_some());
1130        assert!(first.unwrap().is_ok());
1131
1132        let second = walk.next().await;
1133        assert!(second.is_some());
1134        assert!(second.unwrap().is_ok());
1135
1136        let third = walk.next().await;
1137        assert!(third.is_none()); // Walk ended
1138    }
1139
1140    #[tokio::test]
1141    async fn test_walk_inherent_collect() {
1142        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1143
1144        mock.queue_response(
1145            ResponseBuilder::new(1)
1146                .varbind(
1147                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1148                    Value::OctetString("test".into()),
1149                )
1150                .build_v2c(b"public"),
1151        );
1152        mock.queue_response(
1153            ResponseBuilder::new(2)
1154                .varbind(
1155                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1156                    Value::Integer(42),
1157                )
1158                .build_v2c(b"public"),
1159        );
1160        mock.queue_response(
1161            ResponseBuilder::new(3)
1162                .varbind(
1163                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1164                    Value::Integer(1),
1165                )
1166                .build_v2c(b"public"),
1167        );
1168
1169        let client = mock_client(mock);
1170        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1171
1172        // Use inherent collect() method
1173        let results = walk.collect().await.unwrap();
1174        assert_eq!(results.len(), 2);
1175    }
1176
1177    #[tokio::test]
1178    async fn test_bulk_walk_inherent_collect() {
1179        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1180
1181        mock.queue_response(
1182            ResponseBuilder::new(1)
1183                .varbind(
1184                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1185                    Value::OctetString("desc".into()),
1186                )
1187                .varbind(
1188                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1189                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1190                )
1191                .varbind(
1192                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // outside system
1193                    Value::Integer(1),
1194                )
1195                .build_v2c(b"public"),
1196        );
1197
1198        let client = mock_client(mock);
1199        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1200
1201        // Use inherent collect() method
1202        let results = walk.collect().await.unwrap();
1203        assert_eq!(results.len(), 2);
1204    }
1205}