async_snmp/client/
walk.rs

1//! Walk stream implementations.
2
3#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20/// Walk operation mode.
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24    /// Auto-select based on version (default).
25    /// V1 uses GETNEXT, V2c/V3 uses GETBULK.
26    #[default]
27    Auto,
28    /// Always use GETNEXT (slower but more compatible).
29    GetNext,
30    /// Always use GETBULK (faster, errors on v1).
31    GetBulk,
32}
33
34/// OID ordering behavior during walk operations.
35#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
36#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
37pub enum OidOrdering {
38    /// Require strictly increasing OIDs (default).
39    /// Walk terminates with error on first violation.
40    /// Most efficient: O(1) memory, O(1) per-item check.
41    #[default]
42    Strict,
43
44    /// Allow non-increasing OIDs, with cycle detection.
45    /// Some buggy agents return OIDs out of order.
46    /// Walk tracks all seen OIDs in a HashSet to detect cycles.
47    /// Terminates with error if same OID returned twice.
48    /// Memory: O(n) where n = number of walk results.
49    AllowNonIncreasing,
50}
51
52/// Internal OID tracking for walk operations.
53///
54/// This enum implements two strategies for detecting walk termination
55/// conditions due to agent misbehavior:
56/// - `Strict`: O(1) memory, compares against previous OID
57/// - `Relaxed`: O(n) memory, tracks all seen OIDs in a HashSet
58enum OidTracker {
59    /// O(1) memory: stores only the previous OID for comparison.
60    /// Used by default Strict mode.
61    Strict { last: Option<Oid> },
62
63    /// O(n) memory: HashSet of all seen OIDs for cycle detection.
64    /// Only allocated when AllowNonIncreasing is configured.
65    Relaxed { seen: HashSet<Oid> },
66}
67
68impl OidTracker {
69    fn new(ordering: OidOrdering) -> Self {
70        match ordering {
71            OidOrdering::Strict => OidTracker::Strict { last: None },
72            OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
73                seen: HashSet::new(),
74            },
75        }
76    }
77
78    /// Check if OID is valid according to ordering rules.
79    /// Returns Ok(()) if valid, Err if violation detected.
80    fn check(&mut self, oid: &Oid) -> Result<()> {
81        match self {
82            OidTracker::Strict { last } => {
83                if let Some(prev) = last
84                    && oid <= prev
85                {
86                    return Err(Error::NonIncreasingOid {
87                        previous: prev.clone(),
88                        current: oid.clone(),
89                    });
90                }
91                *last = Some(oid.clone());
92                Ok(())
93            }
94            OidTracker::Relaxed { seen } => {
95                if !seen.insert(oid.clone()) {
96                    return Err(Error::DuplicateOid { oid: oid.clone() });
97                }
98                Ok(())
99            }
100        }
101    }
102}
103
104/// Async stream for walking an OID subtree using GETNEXT.
105///
106/// Created by [`Client::walk()`].
107pub struct Walk<T: Transport> {
108    client: Client<T>,
109    base_oid: Oid,
110    current_oid: Oid,
111    /// OID tracker for ordering validation.
112    oid_tracker: OidTracker,
113    /// Maximum number of results to return (None = unlimited).
114    max_results: Option<usize>,
115    /// Count of results returned so far.
116    count: usize,
117    done: bool,
118    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
119}
120
121impl<T: Transport> Walk<T> {
122    pub(crate) fn new(
123        client: Client<T>,
124        oid: Oid,
125        ordering: OidOrdering,
126        max_results: Option<usize>,
127    ) -> Self {
128        Self {
129            client,
130            base_oid: oid.clone(),
131            current_oid: oid,
132            oid_tracker: OidTracker::new(ordering),
133            max_results,
134            count: 0,
135            done: false,
136            pending: None,
137        }
138    }
139}
140
141impl<T: Transport + 'static> Walk<T> {
142    /// Get the next varbind, or None when complete.
143    pub async fn next(&mut self) -> Option<Result<VarBind>> {
144        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
145    }
146
147    /// Collect all remaining varbinds.
148    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
149        let mut results = Vec::new();
150        while let Some(result) = self.next().await {
151            results.push(result?);
152        }
153        Ok(results)
154    }
155}
156
157impl<T: Transport + 'static> Stream for Walk<T> {
158    type Item = Result<VarBind>;
159
160    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161        if self.done {
162            return Poll::Ready(None);
163        }
164
165        // Check max_results limit
166        if let Some(max) = self.max_results
167            && self.count >= max
168        {
169            self.done = true;
170            return Poll::Ready(None);
171        }
172
173        // Check if we have a pending request
174        if self.pending.is_none() {
175            // Start a new GETNEXT request
176            let client = self.client.clone();
177            let oid = self.current_oid.clone();
178
179            let fut = Box::pin(async move { client.get_next(&oid).await });
180            self.pending = Some(fut);
181        }
182
183        // Poll the pending future
184        let pending = self.pending.as_mut().unwrap();
185        match pending.as_mut().poll(cx) {
186            Poll::Pending => Poll::Pending,
187            Poll::Ready(result) => {
188                self.pending = None;
189
190                match result {
191                    Ok(vb) => {
192                        // Check for end conditions
193                        if matches!(vb.value, Value::EndOfMibView) {
194                            self.done = true;
195                            return Poll::Ready(None);
196                        }
197
198                        // Check if OID left the subtree
199                        if !vb.oid.starts_with(&self.base_oid) {
200                            self.done = true;
201                            return Poll::Ready(None);
202                        }
203
204                        // Check OID ordering using the tracker
205                        if let Err(e) = self.oid_tracker.check(&vb.oid) {
206                            self.done = true;
207                            return Poll::Ready(Some(Err(e)));
208                        }
209
210                        // Update current OID for next iteration
211                        self.current_oid = vb.oid.clone();
212                        self.count += 1;
213
214                        Poll::Ready(Some(Ok(vb)))
215                    }
216                    Err(e) => {
217                        self.done = true;
218                        Poll::Ready(Some(Err(e)))
219                    }
220                }
221            }
222        }
223    }
224}
225
226/// Async stream for walking an OID subtree using GETBULK.
227///
228/// Created by [`Client::bulk_walk()`].
229pub struct BulkWalk<T: Transport> {
230    client: Client<T>,
231    base_oid: Oid,
232    current_oid: Oid,
233    max_repetitions: i32,
234    /// OID tracker for ordering validation.
235    oid_tracker: OidTracker,
236    /// Maximum number of results to return (None = unlimited).
237    max_results: Option<usize>,
238    /// Count of results returned so far.
239    count: usize,
240    done: bool,
241    /// Buffered results from the last GETBULK response
242    buffer: Vec<VarBind>,
243    /// Index into the buffer
244    buffer_idx: usize,
245    pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
246}
247
248impl<T: Transport> BulkWalk<T> {
249    pub(crate) fn new(
250        client: Client<T>,
251        oid: Oid,
252        max_repetitions: i32,
253        ordering: OidOrdering,
254        max_results: Option<usize>,
255    ) -> Self {
256        Self {
257            client,
258            base_oid: oid.clone(),
259            current_oid: oid,
260            max_repetitions,
261            oid_tracker: OidTracker::new(ordering),
262            max_results,
263            count: 0,
264            done: false,
265            buffer: Vec::new(),
266            buffer_idx: 0,
267            pending: None,
268        }
269    }
270}
271
272impl<T: Transport + 'static> BulkWalk<T> {
273    /// Get the next varbind, or None when complete.
274    pub async fn next(&mut self) -> Option<Result<VarBind>> {
275        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
276    }
277
278    /// Collect all remaining varbinds.
279    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
280        let mut results = Vec::new();
281        while let Some(result) = self.next().await {
282            results.push(result?);
283        }
284        Ok(results)
285    }
286}
287
288impl<T: Transport + 'static> Stream for BulkWalk<T> {
289    type Item = Result<VarBind>;
290
291    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292        loop {
293            if self.done {
294                return Poll::Ready(None);
295            }
296
297            // Check max_results limit
298            if let Some(max) = self.max_results
299                && self.count >= max
300            {
301                self.done = true;
302                return Poll::Ready(None);
303            }
304
305            // Check if we have buffered results to return
306            if self.buffer_idx < self.buffer.len() {
307                let vb = self.buffer[self.buffer_idx].clone();
308                self.buffer_idx += 1;
309
310                // Check for end conditions
311                if matches!(vb.value, Value::EndOfMibView) {
312                    self.done = true;
313                    return Poll::Ready(None);
314                }
315
316                // Check if OID left the subtree
317                if !vb.oid.starts_with(&self.base_oid) {
318                    self.done = true;
319                    return Poll::Ready(None);
320                }
321
322                // Check OID ordering using the tracker
323                if let Err(e) = self.oid_tracker.check(&vb.oid) {
324                    self.done = true;
325                    return Poll::Ready(Some(Err(e)));
326                }
327
328                // Update current OID for next request
329                self.current_oid = vb.oid.clone();
330                self.count += 1;
331
332                return Poll::Ready(Some(Ok(vb)));
333            }
334
335            // Buffer exhausted, need to fetch more
336            if self.pending.is_none() {
337                let client = self.client.clone();
338                let oid = self.current_oid.clone();
339                let max_rep = self.max_repetitions;
340
341                let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
342                self.pending = Some(fut);
343            }
344
345            // Poll the pending future
346            let pending = self.pending.as_mut().unwrap();
347            match pending.as_mut().poll(cx) {
348                Poll::Pending => return Poll::Pending,
349                Poll::Ready(result) => {
350                    self.pending = None;
351
352                    match result {
353                        Ok(varbinds) => {
354                            if varbinds.is_empty() {
355                                self.done = true;
356                                return Poll::Ready(None);
357                            }
358
359                            self.buffer = varbinds;
360                            self.buffer_idx = 0;
361                            // Continue loop to process buffer
362                        }
363                        Err(e) => {
364                            self.done = true;
365                            return Poll::Ready(Some(Err(e)));
366                        }
367                    }
368                }
369            }
370        }
371    }
372}
373
374// ============================================================================
375// Unified WalkStream - auto-selects GETNEXT or GETBULK based on WalkMode
376// ============================================================================
377
378/// Unified walk stream that auto-selects between GETNEXT and GETBULK.
379///
380/// Created by [`Client::walk()`] when using `WalkMode::Auto` or explicit mode selection.
381/// This type wraps either a [`Walk`] or [`BulkWalk`] internally based on:
382/// - `WalkMode::Auto`: Uses GETNEXT for V1, GETBULK for V2c/V3
383/// - `WalkMode::GetNext`: Always uses GETNEXT
384/// - `WalkMode::GetBulk`: Always uses GETBULK (fails on V1)
385pub enum WalkStream<T: Transport> {
386    /// GETNEXT-based walk (used for V1 or when explicitly requested)
387    GetNext(Walk<T>),
388    /// GETBULK-based walk (used for V2c/V3 or when explicitly requested)
389    GetBulk(BulkWalk<T>),
390}
391
392impl<T: Transport> WalkStream<T> {
393    /// Create a new walk stream with auto-selection based on version and walk mode.
394    pub(crate) fn new(
395        client: Client<T>,
396        oid: Oid,
397        version: Version,
398        walk_mode: WalkMode,
399        ordering: OidOrdering,
400        max_results: Option<usize>,
401        max_repetitions: i32,
402    ) -> Result<Self> {
403        let use_bulk = match walk_mode {
404            WalkMode::Auto => version != Version::V1,
405            WalkMode::GetNext => false,
406            WalkMode::GetBulk => {
407                if version == Version::V1 {
408                    return Err(Error::GetBulkNotSupportedInV1);
409                }
410                true
411            }
412        };
413
414        Ok(if use_bulk {
415            WalkStream::GetBulk(BulkWalk::new(
416                client,
417                oid,
418                max_repetitions,
419                ordering,
420                max_results,
421            ))
422        } else {
423            WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
424        })
425    }
426}
427
428impl<T: Transport + 'static> WalkStream<T> {
429    /// Get the next varbind, or None when complete.
430    pub async fn next(&mut self) -> Option<Result<VarBind>> {
431        std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
432    }
433
434    /// Collect all remaining varbinds.
435    pub async fn collect(mut self) -> Result<Vec<VarBind>> {
436        let mut results = Vec::new();
437        while let Some(result) = self.next().await {
438            results.push(result?);
439        }
440        Ok(results)
441    }
442}
443
444impl<T: Transport + 'static> Stream for WalkStream<T> {
445    type Item = Result<VarBind>;
446
447    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
448        // SAFETY: We're just projecting the pin to the inner enum variant
449        match self.get_mut() {
450            WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
451            WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::transport::{MockTransport, ResponseBuilder};
460    use crate::{ClientConfig, Version};
461    use bytes::Bytes;
462    use futures_core::Stream;
463    use std::pin::Pin;
464    use std::task::Context;
465    use std::time::Duration;
466
467    fn mock_client(mock: MockTransport) -> Client<MockTransport> {
468        let config = ClientConfig {
469            version: Version::V2c,
470            community: Bytes::from_static(b"public"),
471            timeout: Duration::from_secs(1),
472            retries: 0,
473            max_oids_per_request: 10,
474            v3_security: None,
475            walk_mode: WalkMode::Auto,
476            oid_ordering: OidOrdering::Strict,
477            max_walk_results: None,
478            max_repetitions: 25,
479        };
480        Client::new(mock, config)
481    }
482
483    async fn collect_walk<T: Transport + 'static>(
484        mut walk: Pin<&mut Walk<T>>,
485        limit: usize,
486    ) -> Vec<Result<VarBind>> {
487        use std::future::poll_fn;
488
489        let mut results = Vec::new();
490        while results.len() < limit {
491            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
492            match item {
493                Some(result) => results.push(result),
494                None => break,
495            }
496        }
497        results
498    }
499
500    async fn collect_bulk_walk<T: Transport + 'static>(
501        mut walk: Pin<&mut BulkWalk<T>>,
502        limit: usize,
503    ) -> Vec<Result<VarBind>> {
504        use std::future::poll_fn;
505
506        let mut results = Vec::new();
507        while results.len() < limit {
508            let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
509            match item {
510                Some(result) => results.push(result),
511                None => break,
512            }
513        }
514        results
515    }
516
517    #[tokio::test]
518    async fn test_walk_terminates_on_end_of_mib_view() {
519        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
520
521        // First response: valid OID in subtree
522        mock.queue_response(
523            ResponseBuilder::new(1)
524                .varbind(
525                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
526                    Value::OctetString("test".into()),
527                )
528                .build_v2c(b"public"),
529        );
530
531        // Second response: EndOfMibView
532        mock.queue_response(
533            ResponseBuilder::new(2)
534                .varbind(
535                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
536                    Value::EndOfMibView,
537                )
538                .build_v2c(b"public"),
539        );
540
541        let client = mock_client(mock);
542        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
543
544        let mut pinned = Box::pin(walk);
545        let results = collect_walk(pinned.as_mut(), 10).await;
546
547        assert_eq!(results.len(), 1);
548        assert!(results[0].is_ok());
549    }
550
551    #[tokio::test]
552    async fn test_walk_terminates_when_leaving_subtree() {
553        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
554
555        // Response with OID outside the walked subtree (interfaces, not system)
556        mock.queue_response(
557            ResponseBuilder::new(1)
558                .varbind(
559                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces subtree
560                    Value::Integer(1),
561                )
562                .build_v2c(b"public"),
563        );
564
565        let client = mock_client(mock);
566        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); // system subtree
567
568        let mut pinned = Box::pin(walk);
569        let results = collect_walk(pinned.as_mut(), 10).await;
570
571        // Should terminate immediately with no results
572        assert_eq!(results.len(), 0);
573    }
574
575    #[tokio::test]
576    async fn test_walk_returns_oids_in_sequence() {
577        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
578
579        // Queue three responses in lexicographic order
580        mock.queue_response(
581            ResponseBuilder::new(1)
582                .varbind(
583                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
584                    Value::OctetString("desc".into()),
585                )
586                .build_v2c(b"public"),
587        );
588        mock.queue_response(
589            ResponseBuilder::new(2)
590                .varbind(
591                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
592                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
593                )
594                .build_v2c(b"public"),
595        );
596        mock.queue_response(
597            ResponseBuilder::new(3)
598                .varbind(
599                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
600                    Value::TimeTicks(12345),
601                )
602                .build_v2c(b"public"),
603        );
604        // Fourth response leaves subtree
605        mock.queue_response(
606            ResponseBuilder::new(4)
607                .varbind(
608                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
609                    Value::Integer(1),
610                )
611                .build_v2c(b"public"),
612        );
613
614        let client = mock_client(mock);
615        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
616
617        let mut pinned = Box::pin(walk);
618        let results = collect_walk(pinned.as_mut(), 10).await;
619
620        assert_eq!(results.len(), 3);
621
622        // Verify lexicographic ordering
623        let oids: Vec<_> = results
624            .iter()
625            .filter_map(|r| r.as_ref().ok())
626            .map(|vb| &vb.oid)
627            .collect();
628        for i in 1..oids.len() {
629            assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
630        }
631    }
632
633    #[tokio::test]
634    async fn test_walk_propagates_errors() {
635        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
636
637        // First response succeeds
638        mock.queue_response(
639            ResponseBuilder::new(1)
640                .varbind(
641                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
642                    Value::OctetString("test".into()),
643                )
644                .build_v2c(b"public"),
645        );
646
647        // Second request times out
648        mock.queue_timeout();
649
650        let client = mock_client(mock);
651        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
652
653        let mut pinned = Box::pin(walk);
654        let results = collect_walk(pinned.as_mut(), 10).await;
655
656        assert_eq!(results.len(), 2);
657        assert!(results[0].is_ok());
658        assert!(results[1].is_err());
659    }
660
661    #[tokio::test]
662    async fn test_bulk_walk_terminates_on_end_of_mib_view() {
663        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
664
665        // GETBULK response with multiple varbinds, last one is EndOfMibView
666        mock.queue_response(
667            ResponseBuilder::new(1)
668                .varbind(
669                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
670                    Value::OctetString("desc".into()),
671                )
672                .varbind(
673                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
674                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
675                )
676                .varbind(
677                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
678                    Value::EndOfMibView,
679                )
680                .build_v2c(b"public"),
681        );
682
683        let client = mock_client(mock);
684        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
685
686        let mut pinned = Box::pin(walk);
687        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
688
689        // Should return 2 valid results before EndOfMibView terminates
690        assert_eq!(results.len(), 2);
691        assert!(results.iter().all(|r| r.is_ok()));
692    }
693
694    #[tokio::test]
695    async fn test_bulk_walk_terminates_when_leaving_subtree() {
696        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
697
698        // GETBULK returns varbinds, some in subtree, one outside
699        mock.queue_response(
700            ResponseBuilder::new(1)
701                .varbind(
702                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
703                    Value::OctetString("desc".into()),
704                )
705                .varbind(
706                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
707                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
708                )
709                .varbind(
710                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // interfaces - outside system
711                    Value::Integer(1),
712                )
713                .build_v2c(b"public"),
714        );
715
716        let client = mock_client(mock);
717        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
718
719        let mut pinned = Box::pin(walk);
720        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
721
722        // Should return 2 results (third OID is outside subtree)
723        assert_eq!(results.len(), 2);
724    }
725
726    #[tokio::test]
727    async fn test_bulk_walk_handles_empty_response() {
728        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
729
730        // Empty GETBULK response (no varbinds)
731        mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
732
733        let client = mock_client(mock);
734        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
735
736        let mut pinned = Box::pin(walk);
737        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
738
739        // Should return empty
740        assert_eq!(results.len(), 0);
741    }
742
743    // Tests for non-increasing OID detection.
744    // These prevent infinite loops on non-conformant SNMP agents.
745
746    #[tokio::test]
747    async fn test_walk_errors_on_decreasing_oid() {
748        use crate::error::Error;
749
750        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
751
752        // First response: .1.3.6.1.2.1.1.5.0
753        mock.queue_response(
754            ResponseBuilder::new(1)
755                .varbind(
756                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
757                    Value::OctetString("host1".into()),
758                )
759                .build_v2c(b"public"),
760        );
761
762        // Second response: .1.3.6.1.2.1.1.4.0 (DECREASING - goes backwards!)
763        mock.queue_response(
764            ResponseBuilder::new(2)
765                .varbind(
766                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
767                    Value::OctetString("admin".into()),
768                )
769                .build_v2c(b"public"),
770        );
771
772        let client = mock_client(mock);
773        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
774
775        let mut pinned = Box::pin(walk);
776        let results = collect_walk(pinned.as_mut(), 10).await;
777
778        // Should get first result OK, then error on second
779        assert_eq!(results.len(), 2);
780        assert!(results[0].is_ok());
781        assert!(matches!(
782            &results[1],
783            Err(Error::NonIncreasingOid { previous, current })
784            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
785               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
786        ));
787    }
788
789    #[tokio::test]
790    async fn test_walk_errors_on_same_oid_returned_twice() {
791        use crate::error::Error;
792
793        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
794
795        // First response: .1.3.6.1.2.1.1.1.0
796        mock.queue_response(
797            ResponseBuilder::new(1)
798                .varbind(
799                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
800                    Value::OctetString("desc".into()),
801                )
802                .build_v2c(b"public"),
803        );
804
805        // Second response: same OID again! (would cause infinite loop)
806        mock.queue_response(
807            ResponseBuilder::new(2)
808                .varbind(
809                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
810                    Value::OctetString("desc".into()),
811                )
812                .build_v2c(b"public"),
813        );
814
815        let client = mock_client(mock);
816        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
817
818        let mut pinned = Box::pin(walk);
819        let results = collect_walk(pinned.as_mut(), 10).await;
820
821        // Should get first result OK, then error on second
822        assert_eq!(results.len(), 2);
823        assert!(results[0].is_ok());
824        assert!(matches!(
825            &results[1],
826            Err(Error::NonIncreasingOid { previous, current })
827            if previous == current
828        ));
829    }
830
831    #[tokio::test]
832    async fn test_bulk_walk_errors_on_non_increasing_oid() {
833        use crate::error::Error;
834
835        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
836
837        // First GETBULK response with non-increasing OID in the batch
838        mock.queue_response(
839            ResponseBuilder::new(1)
840                .varbind(
841                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
842                    Value::OctetString("desc".into()),
843                )
844                .varbind(
845                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
846                    Value::TimeTicks(12345),
847                )
848                .varbind(
849                    // Non-increasing: .1.2.0 < .3.0 (goes backwards)
850                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
851                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
852                )
853                .build_v2c(b"public"),
854        );
855
856        let client = mock_client(mock);
857        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
858
859        let mut pinned = Box::pin(walk);
860        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
861
862        // Should get first two results OK, then error on third
863        assert_eq!(results.len(), 3);
864        assert!(results[0].is_ok());
865        assert!(results[1].is_ok());
866        assert!(matches!(
867            &results[2],
868            Err(Error::NonIncreasingOid { previous, current })
869            if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
870               && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
871        ));
872    }
873
874    // Tests for AllowNonIncreasing OID ordering mode
875
876    #[tokio::test]
877    async fn test_walk_allow_non_increasing_accepts_out_of_order() {
878        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
879
880        // Three OIDs out of order, but no duplicates
881        mock.queue_response(
882            ResponseBuilder::new(1)
883                .varbind(
884                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
885                    Value::OctetString("five".into()),
886                )
887                .build_v2c(b"public"),
888        );
889        mock.queue_response(
890            ResponseBuilder::new(2)
891                .varbind(
892                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), // out of order
893                    Value::OctetString("three".into()),
894                )
895                .build_v2c(b"public"),
896        );
897        mock.queue_response(
898            ResponseBuilder::new(3)
899                .varbind(
900                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
901                    Value::OctetString("seven".into()),
902                )
903                .build_v2c(b"public"),
904        );
905        // Fourth response leaves subtree
906        mock.queue_response(
907            ResponseBuilder::new(4)
908                .varbind(
909                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
910                    Value::Integer(1),
911                )
912                .build_v2c(b"public"),
913        );
914
915        let client = mock_client(mock);
916        let walk = Walk::new(
917            client,
918            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
919            OidOrdering::AllowNonIncreasing,
920            None,
921        );
922
923        let mut pinned = Box::pin(walk);
924        let results = collect_walk(pinned.as_mut(), 10).await;
925
926        // Should get all three results successfully
927        assert_eq!(results.len(), 3);
928        assert!(results.iter().all(|r| r.is_ok()));
929    }
930
931    #[tokio::test]
932    async fn test_walk_allow_non_increasing_detects_cycle() {
933        use crate::error::Error;
934
935        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
936
937        // First OID
938        mock.queue_response(
939            ResponseBuilder::new(1)
940                .varbind(
941                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
942                    Value::OctetString("first".into()),
943                )
944                .build_v2c(b"public"),
945        );
946        // Second OID
947        mock.queue_response(
948            ResponseBuilder::new(2)
949                .varbind(
950                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
951                    Value::OctetString("second".into()),
952                )
953                .build_v2c(b"public"),
954        );
955        // Same as first - cycle!
956        mock.queue_response(
957            ResponseBuilder::new(3)
958                .varbind(
959                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
960                    Value::OctetString("first-again".into()),
961                )
962                .build_v2c(b"public"),
963        );
964
965        let client = mock_client(mock);
966        let walk = Walk::new(
967            client,
968            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
969            OidOrdering::AllowNonIncreasing,
970            None,
971        );
972
973        let mut pinned = Box::pin(walk);
974        let results = collect_walk(pinned.as_mut(), 10).await;
975
976        // Should get first two OK, then DuplicateOid error
977        assert_eq!(results.len(), 3);
978        assert!(results[0].is_ok());
979        assert!(results[1].is_ok());
980        assert!(matches!(
981            &results[2],
982            Err(Error::DuplicateOid { oid })
983            if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
984        ));
985    }
986
987    // Tests for max_results limit
988
989    #[tokio::test]
990    async fn test_walk_respects_max_results() {
991        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
992
993        // Queue many responses
994        for i in 1..=10 {
995            mock.queue_response(
996                ResponseBuilder::new(i)
997                    .varbind(
998                        Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
999                        Value::Integer(i),
1000                    )
1001                    .build_v2c(b"public"),
1002            );
1003        }
1004
1005        let client = mock_client(mock);
1006        let walk = Walk::new(
1007            client,
1008            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1009            OidOrdering::Strict,
1010            Some(3), // Limit to 3 results
1011        );
1012
1013        let mut pinned = Box::pin(walk);
1014        let results = collect_walk(pinned.as_mut(), 20).await;
1015
1016        // Should stop after 3 results
1017        assert_eq!(results.len(), 3);
1018        assert!(results.iter().all(|r| r.is_ok()));
1019    }
1020
1021    #[tokio::test]
1022    async fn test_bulk_walk_respects_max_results() {
1023        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1024
1025        // GETBULK returns many varbinds at once
1026        mock.queue_response(
1027            ResponseBuilder::new(1)
1028                .varbind(
1029                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1030                    Value::Integer(1),
1031                )
1032                .varbind(
1033                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1034                    Value::Integer(2),
1035                )
1036                .varbind(
1037                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1038                    Value::Integer(3),
1039                )
1040                .varbind(
1041                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1042                    Value::Integer(4),
1043                )
1044                .varbind(
1045                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1046                    Value::Integer(5),
1047                )
1048                .build_v2c(b"public"),
1049        );
1050
1051        let client = mock_client(mock);
1052        let walk = BulkWalk::new(
1053            client,
1054            Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1055            10,
1056            OidOrdering::Strict,
1057            Some(3), // Limit to 3 results
1058        );
1059
1060        let mut pinned = Box::pin(walk);
1061        let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1062
1063        // Should stop after 3 results even though buffer has more
1064        assert_eq!(results.len(), 3);
1065        assert!(results.iter().all(|r| r.is_ok()));
1066    }
1067
1068    // Tests for inherent next() and collect() methods
1069
1070    #[tokio::test]
1071    async fn test_walk_inherent_next() {
1072        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1073
1074        mock.queue_response(
1075            ResponseBuilder::new(1)
1076                .varbind(
1077                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1078                    Value::OctetString("test".into()),
1079                )
1080                .build_v2c(b"public"),
1081        );
1082        mock.queue_response(
1083            ResponseBuilder::new(2)
1084                .varbind(
1085                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1086                    Value::Integer(42),
1087                )
1088                .build_v2c(b"public"),
1089        );
1090        mock.queue_response(
1091            ResponseBuilder::new(3)
1092                .varbind(
1093                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1094                    Value::Integer(1),
1095                )
1096                .build_v2c(b"public"),
1097        );
1098
1099        let client = mock_client(mock);
1100        let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1101
1102        // Use inherent next() method
1103        let first = walk.next().await;
1104        assert!(first.is_some());
1105        assert!(first.unwrap().is_ok());
1106
1107        let second = walk.next().await;
1108        assert!(second.is_some());
1109        assert!(second.unwrap().is_ok());
1110
1111        let third = walk.next().await;
1112        assert!(third.is_none()); // Walk ended
1113    }
1114
1115    #[tokio::test]
1116    async fn test_walk_inherent_collect() {
1117        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1118
1119        mock.queue_response(
1120            ResponseBuilder::new(1)
1121                .varbind(
1122                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1123                    Value::OctetString("test".into()),
1124                )
1125                .build_v2c(b"public"),
1126        );
1127        mock.queue_response(
1128            ResponseBuilder::new(2)
1129                .varbind(
1130                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1131                    Value::Integer(42),
1132                )
1133                .build_v2c(b"public"),
1134        );
1135        mock.queue_response(
1136            ResponseBuilder::new(3)
1137                .varbind(
1138                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // leaves subtree
1139                    Value::Integer(1),
1140                )
1141                .build_v2c(b"public"),
1142        );
1143
1144        let client = mock_client(mock);
1145        let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1146
1147        // Use inherent collect() method
1148        let results = walk.collect().await.unwrap();
1149        assert_eq!(results.len(), 2);
1150    }
1151
1152    #[tokio::test]
1153    async fn test_bulk_walk_inherent_collect() {
1154        let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1155
1156        mock.queue_response(
1157            ResponseBuilder::new(1)
1158                .varbind(
1159                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1160                    Value::OctetString("desc".into()),
1161                )
1162                .varbind(
1163                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1164                    Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1165                )
1166                .varbind(
1167                    Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), // outside system
1168                    Value::Integer(1),
1169                )
1170                .build_v2c(b"public"),
1171        );
1172
1173        let client = mock_client(mock);
1174        let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1175
1176        // Use inherent collect() method
1177        let results = walk.collect().await.unwrap();
1178        assert_eq!(results.len(), 2);
1179    }
1180}