1#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24 #[default]
27 Auto,
28 GetNext,
30 GetBulk,
32}
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
36#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
37pub enum OidOrdering {
38 #[default]
42 Strict,
43
44 AllowNonIncreasing,
50}
51
52enum OidTracker {
59 Strict { last: Option<Oid> },
62
63 Relaxed { seen: HashSet<Oid> },
66}
67
68impl OidTracker {
69 fn new(ordering: OidOrdering) -> Self {
70 match ordering {
71 OidOrdering::Strict => OidTracker::Strict { last: None },
72 OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
73 seen: HashSet::new(),
74 },
75 }
76 }
77
78 fn check(&mut self, oid: &Oid) -> Result<()> {
81 match self {
82 OidTracker::Strict { last } => {
83 if let Some(prev) = last
84 && oid <= prev
85 {
86 return Err(Error::NonIncreasingOid {
87 previous: prev.clone(),
88 current: oid.clone(),
89 });
90 }
91 *last = Some(oid.clone());
92 Ok(())
93 }
94 OidTracker::Relaxed { seen } => {
95 if !seen.insert(oid.clone()) {
96 return Err(Error::DuplicateOid { oid: oid.clone() });
97 }
98 Ok(())
99 }
100 }
101 }
102}
103
104pub struct Walk<T: Transport> {
108 client: Client<T>,
109 base_oid: Oid,
110 current_oid: Oid,
111 oid_tracker: OidTracker,
113 max_results: Option<usize>,
115 count: usize,
117 done: bool,
118 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
119}
120
121impl<T: Transport> Walk<T> {
122 pub(crate) fn new(
123 client: Client<T>,
124 oid: Oid,
125 ordering: OidOrdering,
126 max_results: Option<usize>,
127 ) -> Self {
128 Self {
129 client,
130 base_oid: oid.clone(),
131 current_oid: oid,
132 oid_tracker: OidTracker::new(ordering),
133 max_results,
134 count: 0,
135 done: false,
136 pending: None,
137 }
138 }
139}
140
141impl<T: Transport + 'static> Walk<T> {
142 pub async fn next(&mut self) -> Option<Result<VarBind>> {
144 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
145 }
146
147 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
149 let mut results = Vec::new();
150 while let Some(result) = self.next().await {
151 results.push(result?);
152 }
153 Ok(results)
154 }
155}
156
157impl<T: Transport + 'static> Stream for Walk<T> {
158 type Item = Result<VarBind>;
159
160 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 if self.done {
162 return Poll::Ready(None);
163 }
164
165 if let Some(max) = self.max_results
167 && self.count >= max
168 {
169 self.done = true;
170 return Poll::Ready(None);
171 }
172
173 if self.pending.is_none() {
175 let client = self.client.clone();
177 let oid = self.current_oid.clone();
178
179 let fut = Box::pin(async move { client.get_next(&oid).await });
180 self.pending = Some(fut);
181 }
182
183 let pending = self.pending.as_mut().unwrap();
185 match pending.as_mut().poll(cx) {
186 Poll::Pending => Poll::Pending,
187 Poll::Ready(result) => {
188 self.pending = None;
189
190 match result {
191 Ok(vb) => {
192 if matches!(vb.value, Value::EndOfMibView) {
194 self.done = true;
195 return Poll::Ready(None);
196 }
197
198 if !vb.oid.starts_with(&self.base_oid) {
200 self.done = true;
201 return Poll::Ready(None);
202 }
203
204 if let Err(e) = self.oid_tracker.check(&vb.oid) {
206 self.done = true;
207 return Poll::Ready(Some(Err(e)));
208 }
209
210 self.current_oid = vb.oid.clone();
212 self.count += 1;
213
214 Poll::Ready(Some(Ok(vb)))
215 }
216 Err(e) => {
217 self.done = true;
218 Poll::Ready(Some(Err(e)))
219 }
220 }
221 }
222 }
223 }
224}
225
226pub struct BulkWalk<T: Transport> {
230 client: Client<T>,
231 base_oid: Oid,
232 current_oid: Oid,
233 max_repetitions: i32,
234 oid_tracker: OidTracker,
236 max_results: Option<usize>,
238 count: usize,
240 done: bool,
241 buffer: Vec<VarBind>,
243 buffer_idx: usize,
245 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
246}
247
248impl<T: Transport> BulkWalk<T> {
249 pub(crate) fn new(
250 client: Client<T>,
251 oid: Oid,
252 max_repetitions: i32,
253 ordering: OidOrdering,
254 max_results: Option<usize>,
255 ) -> Self {
256 Self {
257 client,
258 base_oid: oid.clone(),
259 current_oid: oid,
260 max_repetitions,
261 oid_tracker: OidTracker::new(ordering),
262 max_results,
263 count: 0,
264 done: false,
265 buffer: Vec::new(),
266 buffer_idx: 0,
267 pending: None,
268 }
269 }
270}
271
272impl<T: Transport + 'static> BulkWalk<T> {
273 pub async fn next(&mut self) -> Option<Result<VarBind>> {
275 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
276 }
277
278 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
280 let mut results = Vec::new();
281 while let Some(result) = self.next().await {
282 results.push(result?);
283 }
284 Ok(results)
285 }
286}
287
288impl<T: Transport + 'static> Stream for BulkWalk<T> {
289 type Item = Result<VarBind>;
290
291 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292 loop {
293 if self.done {
294 return Poll::Ready(None);
295 }
296
297 if let Some(max) = self.max_results
299 && self.count >= max
300 {
301 self.done = true;
302 return Poll::Ready(None);
303 }
304
305 if self.buffer_idx < self.buffer.len() {
307 let vb = self.buffer[self.buffer_idx].clone();
308 self.buffer_idx += 1;
309
310 if matches!(vb.value, Value::EndOfMibView) {
312 self.done = true;
313 return Poll::Ready(None);
314 }
315
316 if !vb.oid.starts_with(&self.base_oid) {
318 self.done = true;
319 return Poll::Ready(None);
320 }
321
322 if let Err(e) = self.oid_tracker.check(&vb.oid) {
324 self.done = true;
325 return Poll::Ready(Some(Err(e)));
326 }
327
328 self.current_oid = vb.oid.clone();
330 self.count += 1;
331
332 return Poll::Ready(Some(Ok(vb)));
333 }
334
335 if self.pending.is_none() {
337 let client = self.client.clone();
338 let oid = self.current_oid.clone();
339 let max_rep = self.max_repetitions;
340
341 let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
342 self.pending = Some(fut);
343 }
344
345 let pending = self.pending.as_mut().unwrap();
347 match pending.as_mut().poll(cx) {
348 Poll::Pending => return Poll::Pending,
349 Poll::Ready(result) => {
350 self.pending = None;
351
352 match result {
353 Ok(varbinds) => {
354 if varbinds.is_empty() {
355 self.done = true;
356 return Poll::Ready(None);
357 }
358
359 self.buffer = varbinds;
360 self.buffer_idx = 0;
361 }
363 Err(e) => {
364 self.done = true;
365 return Poll::Ready(Some(Err(e)));
366 }
367 }
368 }
369 }
370 }
371 }
372}
373
374pub enum WalkStream<T: Transport> {
386 GetNext(Walk<T>),
388 GetBulk(BulkWalk<T>),
390}
391
392impl<T: Transport> WalkStream<T> {
393 pub(crate) fn new(
395 client: Client<T>,
396 oid: Oid,
397 version: Version,
398 walk_mode: WalkMode,
399 ordering: OidOrdering,
400 max_results: Option<usize>,
401 max_repetitions: i32,
402 ) -> Result<Self> {
403 let use_bulk = match walk_mode {
404 WalkMode::Auto => version != Version::V1,
405 WalkMode::GetNext => false,
406 WalkMode::GetBulk => {
407 if version == Version::V1 {
408 return Err(Error::GetBulkNotSupportedInV1);
409 }
410 true
411 }
412 };
413
414 Ok(if use_bulk {
415 WalkStream::GetBulk(BulkWalk::new(
416 client,
417 oid,
418 max_repetitions,
419 ordering,
420 max_results,
421 ))
422 } else {
423 WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
424 })
425 }
426}
427
428impl<T: Transport + 'static> WalkStream<T> {
429 pub async fn next(&mut self) -> Option<Result<VarBind>> {
431 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
432 }
433
434 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
436 let mut results = Vec::new();
437 while let Some(result) = self.next().await {
438 results.push(result?);
439 }
440 Ok(results)
441 }
442}
443
444impl<T: Transport + 'static> Stream for WalkStream<T> {
445 type Item = Result<VarBind>;
446
447 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
448 match self.get_mut() {
450 WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
451 WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::transport::{MockTransport, ResponseBuilder};
460 use crate::{ClientConfig, Version};
461 use bytes::Bytes;
462 use futures_core::Stream;
463 use std::pin::Pin;
464 use std::task::Context;
465 use std::time::Duration;
466
467 fn mock_client(mock: MockTransport) -> Client<MockTransport> {
468 let config = ClientConfig {
469 version: Version::V2c,
470 community: Bytes::from_static(b"public"),
471 timeout: Duration::from_secs(1),
472 retries: 0,
473 max_oids_per_request: 10,
474 v3_security: None,
475 walk_mode: WalkMode::Auto,
476 oid_ordering: OidOrdering::Strict,
477 max_walk_results: None,
478 max_repetitions: 25,
479 };
480 Client::new(mock, config)
481 }
482
483 async fn collect_walk<T: Transport + 'static>(
484 mut walk: Pin<&mut Walk<T>>,
485 limit: usize,
486 ) -> Vec<Result<VarBind>> {
487 use std::future::poll_fn;
488
489 let mut results = Vec::new();
490 while results.len() < limit {
491 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
492 match item {
493 Some(result) => results.push(result),
494 None => break,
495 }
496 }
497 results
498 }
499
500 async fn collect_bulk_walk<T: Transport + 'static>(
501 mut walk: Pin<&mut BulkWalk<T>>,
502 limit: usize,
503 ) -> Vec<Result<VarBind>> {
504 use std::future::poll_fn;
505
506 let mut results = Vec::new();
507 while results.len() < limit {
508 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
509 match item {
510 Some(result) => results.push(result),
511 None => break,
512 }
513 }
514 results
515 }
516
517 #[tokio::test]
518 async fn test_walk_terminates_on_end_of_mib_view() {
519 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
520
521 mock.queue_response(
523 ResponseBuilder::new(1)
524 .varbind(
525 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
526 Value::OctetString("test".into()),
527 )
528 .build_v2c(b"public"),
529 );
530
531 mock.queue_response(
533 ResponseBuilder::new(2)
534 .varbind(
535 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
536 Value::EndOfMibView,
537 )
538 .build_v2c(b"public"),
539 );
540
541 let client = mock_client(mock);
542 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
543
544 let mut pinned = Box::pin(walk);
545 let results = collect_walk(pinned.as_mut(), 10).await;
546
547 assert_eq!(results.len(), 1);
548 assert!(results[0].is_ok());
549 }
550
551 #[tokio::test]
552 async fn test_walk_terminates_when_leaving_subtree() {
553 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
554
555 mock.queue_response(
557 ResponseBuilder::new(1)
558 .varbind(
559 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
561 )
562 .build_v2c(b"public"),
563 );
564
565 let client = mock_client(mock);
566 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); let mut pinned = Box::pin(walk);
569 let results = collect_walk(pinned.as_mut(), 10).await;
570
571 assert_eq!(results.len(), 0);
573 }
574
575 #[tokio::test]
576 async fn test_walk_returns_oids_in_sequence() {
577 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
578
579 mock.queue_response(
581 ResponseBuilder::new(1)
582 .varbind(
583 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
584 Value::OctetString("desc".into()),
585 )
586 .build_v2c(b"public"),
587 );
588 mock.queue_response(
589 ResponseBuilder::new(2)
590 .varbind(
591 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
592 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
593 )
594 .build_v2c(b"public"),
595 );
596 mock.queue_response(
597 ResponseBuilder::new(3)
598 .varbind(
599 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
600 Value::TimeTicks(12345),
601 )
602 .build_v2c(b"public"),
603 );
604 mock.queue_response(
606 ResponseBuilder::new(4)
607 .varbind(
608 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
609 Value::Integer(1),
610 )
611 .build_v2c(b"public"),
612 );
613
614 let client = mock_client(mock);
615 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
616
617 let mut pinned = Box::pin(walk);
618 let results = collect_walk(pinned.as_mut(), 10).await;
619
620 assert_eq!(results.len(), 3);
621
622 let oids: Vec<_> = results
624 .iter()
625 .filter_map(|r| r.as_ref().ok())
626 .map(|vb| &vb.oid)
627 .collect();
628 for i in 1..oids.len() {
629 assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
630 }
631 }
632
633 #[tokio::test]
634 async fn test_walk_propagates_errors() {
635 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
636
637 mock.queue_response(
639 ResponseBuilder::new(1)
640 .varbind(
641 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
642 Value::OctetString("test".into()),
643 )
644 .build_v2c(b"public"),
645 );
646
647 mock.queue_timeout();
649
650 let client = mock_client(mock);
651 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
652
653 let mut pinned = Box::pin(walk);
654 let results = collect_walk(pinned.as_mut(), 10).await;
655
656 assert_eq!(results.len(), 2);
657 assert!(results[0].is_ok());
658 assert!(results[1].is_err());
659 }
660
661 #[tokio::test]
662 async fn test_bulk_walk_terminates_on_end_of_mib_view() {
663 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
664
665 mock.queue_response(
667 ResponseBuilder::new(1)
668 .varbind(
669 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
670 Value::OctetString("desc".into()),
671 )
672 .varbind(
673 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
674 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
675 )
676 .varbind(
677 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
678 Value::EndOfMibView,
679 )
680 .build_v2c(b"public"),
681 );
682
683 let client = mock_client(mock);
684 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
685
686 let mut pinned = Box::pin(walk);
687 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
688
689 assert_eq!(results.len(), 2);
691 assert!(results.iter().all(|r| r.is_ok()));
692 }
693
694 #[tokio::test]
695 async fn test_bulk_walk_terminates_when_leaving_subtree() {
696 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
697
698 mock.queue_response(
700 ResponseBuilder::new(1)
701 .varbind(
702 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
703 Value::OctetString("desc".into()),
704 )
705 .varbind(
706 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
707 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
708 )
709 .varbind(
710 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
712 )
713 .build_v2c(b"public"),
714 );
715
716 let client = mock_client(mock);
717 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
718
719 let mut pinned = Box::pin(walk);
720 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
721
722 assert_eq!(results.len(), 2);
724 }
725
726 #[tokio::test]
727 async fn test_bulk_walk_handles_empty_response() {
728 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
729
730 mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
732
733 let client = mock_client(mock);
734 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
735
736 let mut pinned = Box::pin(walk);
737 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
738
739 assert_eq!(results.len(), 0);
741 }
742
743 #[tokio::test]
747 async fn test_walk_errors_on_decreasing_oid() {
748 use crate::error::Error;
749
750 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
751
752 mock.queue_response(
754 ResponseBuilder::new(1)
755 .varbind(
756 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
757 Value::OctetString("host1".into()),
758 )
759 .build_v2c(b"public"),
760 );
761
762 mock.queue_response(
764 ResponseBuilder::new(2)
765 .varbind(
766 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
767 Value::OctetString("admin".into()),
768 )
769 .build_v2c(b"public"),
770 );
771
772 let client = mock_client(mock);
773 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
774
775 let mut pinned = Box::pin(walk);
776 let results = collect_walk(pinned.as_mut(), 10).await;
777
778 assert_eq!(results.len(), 2);
780 assert!(results[0].is_ok());
781 assert!(matches!(
782 &results[1],
783 Err(Error::NonIncreasingOid { previous, current })
784 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
785 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
786 ));
787 }
788
789 #[tokio::test]
790 async fn test_walk_errors_on_same_oid_returned_twice() {
791 use crate::error::Error;
792
793 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
794
795 mock.queue_response(
797 ResponseBuilder::new(1)
798 .varbind(
799 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
800 Value::OctetString("desc".into()),
801 )
802 .build_v2c(b"public"),
803 );
804
805 mock.queue_response(
807 ResponseBuilder::new(2)
808 .varbind(
809 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
810 Value::OctetString("desc".into()),
811 )
812 .build_v2c(b"public"),
813 );
814
815 let client = mock_client(mock);
816 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
817
818 let mut pinned = Box::pin(walk);
819 let results = collect_walk(pinned.as_mut(), 10).await;
820
821 assert_eq!(results.len(), 2);
823 assert!(results[0].is_ok());
824 assert!(matches!(
825 &results[1],
826 Err(Error::NonIncreasingOid { previous, current })
827 if previous == current
828 ));
829 }
830
831 #[tokio::test]
832 async fn test_bulk_walk_errors_on_non_increasing_oid() {
833 use crate::error::Error;
834
835 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
836
837 mock.queue_response(
839 ResponseBuilder::new(1)
840 .varbind(
841 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
842 Value::OctetString("desc".into()),
843 )
844 .varbind(
845 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
846 Value::TimeTicks(12345),
847 )
848 .varbind(
849 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
851 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
852 )
853 .build_v2c(b"public"),
854 );
855
856 let client = mock_client(mock);
857 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
858
859 let mut pinned = Box::pin(walk);
860 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
861
862 assert_eq!(results.len(), 3);
864 assert!(results[0].is_ok());
865 assert!(results[1].is_ok());
866 assert!(matches!(
867 &results[2],
868 Err(Error::NonIncreasingOid { previous, current })
869 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
870 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
871 ));
872 }
873
874 #[tokio::test]
877 async fn test_walk_allow_non_increasing_accepts_out_of_order() {
878 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
879
880 mock.queue_response(
882 ResponseBuilder::new(1)
883 .varbind(
884 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
885 Value::OctetString("five".into()),
886 )
887 .build_v2c(b"public"),
888 );
889 mock.queue_response(
890 ResponseBuilder::new(2)
891 .varbind(
892 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), Value::OctetString("three".into()),
894 )
895 .build_v2c(b"public"),
896 );
897 mock.queue_response(
898 ResponseBuilder::new(3)
899 .varbind(
900 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
901 Value::OctetString("seven".into()),
902 )
903 .build_v2c(b"public"),
904 );
905 mock.queue_response(
907 ResponseBuilder::new(4)
908 .varbind(
909 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
910 Value::Integer(1),
911 )
912 .build_v2c(b"public"),
913 );
914
915 let client = mock_client(mock);
916 let walk = Walk::new(
917 client,
918 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
919 OidOrdering::AllowNonIncreasing,
920 None,
921 );
922
923 let mut pinned = Box::pin(walk);
924 let results = collect_walk(pinned.as_mut(), 10).await;
925
926 assert_eq!(results.len(), 3);
928 assert!(results.iter().all(|r| r.is_ok()));
929 }
930
931 #[tokio::test]
932 async fn test_walk_allow_non_increasing_detects_cycle() {
933 use crate::error::Error;
934
935 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
936
937 mock.queue_response(
939 ResponseBuilder::new(1)
940 .varbind(
941 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
942 Value::OctetString("first".into()),
943 )
944 .build_v2c(b"public"),
945 );
946 mock.queue_response(
948 ResponseBuilder::new(2)
949 .varbind(
950 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
951 Value::OctetString("second".into()),
952 )
953 .build_v2c(b"public"),
954 );
955 mock.queue_response(
957 ResponseBuilder::new(3)
958 .varbind(
959 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
960 Value::OctetString("first-again".into()),
961 )
962 .build_v2c(b"public"),
963 );
964
965 let client = mock_client(mock);
966 let walk = Walk::new(
967 client,
968 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
969 OidOrdering::AllowNonIncreasing,
970 None,
971 );
972
973 let mut pinned = Box::pin(walk);
974 let results = collect_walk(pinned.as_mut(), 10).await;
975
976 assert_eq!(results.len(), 3);
978 assert!(results[0].is_ok());
979 assert!(results[1].is_ok());
980 assert!(matches!(
981 &results[2],
982 Err(Error::DuplicateOid { oid })
983 if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
984 ));
985 }
986
987 #[tokio::test]
990 async fn test_walk_respects_max_results() {
991 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
992
993 for i in 1..=10 {
995 mock.queue_response(
996 ResponseBuilder::new(i)
997 .varbind(
998 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
999 Value::Integer(i),
1000 )
1001 .build_v2c(b"public"),
1002 );
1003 }
1004
1005 let client = mock_client(mock);
1006 let walk = Walk::new(
1007 client,
1008 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1009 OidOrdering::Strict,
1010 Some(3), );
1012
1013 let mut pinned = Box::pin(walk);
1014 let results = collect_walk(pinned.as_mut(), 20).await;
1015
1016 assert_eq!(results.len(), 3);
1018 assert!(results.iter().all(|r| r.is_ok()));
1019 }
1020
1021 #[tokio::test]
1022 async fn test_bulk_walk_respects_max_results() {
1023 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1024
1025 mock.queue_response(
1027 ResponseBuilder::new(1)
1028 .varbind(
1029 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1030 Value::Integer(1),
1031 )
1032 .varbind(
1033 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1034 Value::Integer(2),
1035 )
1036 .varbind(
1037 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1038 Value::Integer(3),
1039 )
1040 .varbind(
1041 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1042 Value::Integer(4),
1043 )
1044 .varbind(
1045 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1046 Value::Integer(5),
1047 )
1048 .build_v2c(b"public"),
1049 );
1050
1051 let client = mock_client(mock);
1052 let walk = BulkWalk::new(
1053 client,
1054 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1055 10,
1056 OidOrdering::Strict,
1057 Some(3), );
1059
1060 let mut pinned = Box::pin(walk);
1061 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1062
1063 assert_eq!(results.len(), 3);
1065 assert!(results.iter().all(|r| r.is_ok()));
1066 }
1067
1068 #[tokio::test]
1071 async fn test_walk_inherent_next() {
1072 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1073
1074 mock.queue_response(
1075 ResponseBuilder::new(1)
1076 .varbind(
1077 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1078 Value::OctetString("test".into()),
1079 )
1080 .build_v2c(b"public"),
1081 );
1082 mock.queue_response(
1083 ResponseBuilder::new(2)
1084 .varbind(
1085 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1086 Value::Integer(42),
1087 )
1088 .build_v2c(b"public"),
1089 );
1090 mock.queue_response(
1091 ResponseBuilder::new(3)
1092 .varbind(
1093 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1095 )
1096 .build_v2c(b"public"),
1097 );
1098
1099 let client = mock_client(mock);
1100 let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1101
1102 let first = walk.next().await;
1104 assert!(first.is_some());
1105 assert!(first.unwrap().is_ok());
1106
1107 let second = walk.next().await;
1108 assert!(second.is_some());
1109 assert!(second.unwrap().is_ok());
1110
1111 let third = walk.next().await;
1112 assert!(third.is_none()); }
1114
1115 #[tokio::test]
1116 async fn test_walk_inherent_collect() {
1117 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1118
1119 mock.queue_response(
1120 ResponseBuilder::new(1)
1121 .varbind(
1122 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1123 Value::OctetString("test".into()),
1124 )
1125 .build_v2c(b"public"),
1126 );
1127 mock.queue_response(
1128 ResponseBuilder::new(2)
1129 .varbind(
1130 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1131 Value::Integer(42),
1132 )
1133 .build_v2c(b"public"),
1134 );
1135 mock.queue_response(
1136 ResponseBuilder::new(3)
1137 .varbind(
1138 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1140 )
1141 .build_v2c(b"public"),
1142 );
1143
1144 let client = mock_client(mock);
1145 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1146
1147 let results = walk.collect().await.unwrap();
1149 assert_eq!(results.len(), 2);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_bulk_walk_inherent_collect() {
1154 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1155
1156 mock.queue_response(
1157 ResponseBuilder::new(1)
1158 .varbind(
1159 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1160 Value::OctetString("desc".into()),
1161 )
1162 .varbind(
1163 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1164 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1165 )
1166 .varbind(
1167 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1169 )
1170 .build_v2c(b"public"),
1171 );
1172
1173 let client = mock_client(mock);
1174 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1175
1176 let results = walk.collect().await.unwrap();
1178 assert_eq!(results.len(), 2);
1179 }
1180}