1#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24 #[default]
27 Auto,
28 GetNext,
30 GetBulk,
32}
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub enum OidOrdering {
53 #[default]
58 Strict,
59
60 AllowNonIncreasing,
74}
75
76enum OidTracker {
83 Strict { last: Option<Oid> },
86
87 Relaxed { seen: HashSet<Oid> },
90}
91
92impl OidTracker {
93 fn new(ordering: OidOrdering) -> Self {
94 match ordering {
95 OidOrdering::Strict => OidTracker::Strict { last: None },
96 OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
97 seen: HashSet::new(),
98 },
99 }
100 }
101
102 fn check(&mut self, oid: &Oid) -> Result<()> {
105 match self {
106 OidTracker::Strict { last } => {
107 if let Some(prev) = last
108 && oid <= prev
109 {
110 return Err(Error::NonIncreasingOid {
111 previous: prev.clone(),
112 current: oid.clone(),
113 });
114 }
115 *last = Some(oid.clone());
116 Ok(())
117 }
118 OidTracker::Relaxed { seen } => {
119 if !seen.insert(oid.clone()) {
120 return Err(Error::DuplicateOid { oid: oid.clone() });
121 }
122 Ok(())
123 }
124 }
125 }
126}
127
128pub struct Walk<T: Transport> {
132 client: Client<T>,
133 base_oid: Oid,
134 current_oid: Oid,
135 oid_tracker: OidTracker,
137 max_results: Option<usize>,
139 count: usize,
141 done: bool,
142 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
143}
144
145impl<T: Transport> Walk<T> {
146 pub(crate) fn new(
147 client: Client<T>,
148 oid: Oid,
149 ordering: OidOrdering,
150 max_results: Option<usize>,
151 ) -> Self {
152 Self {
153 client,
154 base_oid: oid.clone(),
155 current_oid: oid,
156 oid_tracker: OidTracker::new(ordering),
157 max_results,
158 count: 0,
159 done: false,
160 pending: None,
161 }
162 }
163}
164
165impl<T: Transport + 'static> Walk<T> {
166 pub async fn next(&mut self) -> Option<Result<VarBind>> {
168 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
169 }
170
171 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
173 let mut results = Vec::new();
174 while let Some(result) = self.next().await {
175 results.push(result?);
176 }
177 Ok(results)
178 }
179}
180
181impl<T: Transport + 'static> Stream for Walk<T> {
182 type Item = Result<VarBind>;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 if self.done {
186 return Poll::Ready(None);
187 }
188
189 if let Some(max) = self.max_results
191 && self.count >= max
192 {
193 self.done = true;
194 return Poll::Ready(None);
195 }
196
197 if self.pending.is_none() {
199 let client = self.client.clone();
201 let oid = self.current_oid.clone();
202
203 let fut = Box::pin(async move { client.get_next(&oid).await });
204 self.pending = Some(fut);
205 }
206
207 let pending = self.pending.as_mut().unwrap();
209 match pending.as_mut().poll(cx) {
210 Poll::Pending => Poll::Pending,
211 Poll::Ready(result) => {
212 self.pending = None;
213
214 match result {
215 Ok(vb) => {
216 if matches!(vb.value, Value::EndOfMibView) {
218 self.done = true;
219 return Poll::Ready(None);
220 }
221
222 if !vb.oid.starts_with(&self.base_oid) {
224 self.done = true;
225 return Poll::Ready(None);
226 }
227
228 if let Err(e) = self.oid_tracker.check(&vb.oid) {
230 self.done = true;
231 return Poll::Ready(Some(Err(e)));
232 }
233
234 self.current_oid = vb.oid.clone();
236 self.count += 1;
237
238 Poll::Ready(Some(Ok(vb)))
239 }
240 Err(e) => {
241 self.done = true;
242 Poll::Ready(Some(Err(e)))
243 }
244 }
245 }
246 }
247 }
248}
249
250pub struct BulkWalk<T: Transport> {
254 client: Client<T>,
255 base_oid: Oid,
256 current_oid: Oid,
257 max_repetitions: i32,
258 oid_tracker: OidTracker,
260 max_results: Option<usize>,
262 count: usize,
264 done: bool,
265 buffer: Vec<VarBind>,
267 buffer_idx: usize,
269 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
270}
271
272impl<T: Transport> BulkWalk<T> {
273 pub(crate) fn new(
274 client: Client<T>,
275 oid: Oid,
276 max_repetitions: i32,
277 ordering: OidOrdering,
278 max_results: Option<usize>,
279 ) -> Self {
280 Self {
281 client,
282 base_oid: oid.clone(),
283 current_oid: oid,
284 max_repetitions,
285 oid_tracker: OidTracker::new(ordering),
286 max_results,
287 count: 0,
288 done: false,
289 buffer: Vec::new(),
290 buffer_idx: 0,
291 pending: None,
292 }
293 }
294}
295
296impl<T: Transport + 'static> BulkWalk<T> {
297 pub async fn next(&mut self) -> Option<Result<VarBind>> {
299 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
300 }
301
302 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
304 let mut results = Vec::new();
305 while let Some(result) = self.next().await {
306 results.push(result?);
307 }
308 Ok(results)
309 }
310}
311
312impl<T: Transport + 'static> Stream for BulkWalk<T> {
313 type Item = Result<VarBind>;
314
315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 loop {
317 if self.done {
318 return Poll::Ready(None);
319 }
320
321 if let Some(max) = self.max_results
323 && self.count >= max
324 {
325 self.done = true;
326 return Poll::Ready(None);
327 }
328
329 if self.buffer_idx < self.buffer.len() {
331 let vb = self.buffer[self.buffer_idx].clone();
332 self.buffer_idx += 1;
333
334 if matches!(vb.value, Value::EndOfMibView) {
336 self.done = true;
337 return Poll::Ready(None);
338 }
339
340 if !vb.oid.starts_with(&self.base_oid) {
342 self.done = true;
343 return Poll::Ready(None);
344 }
345
346 if let Err(e) = self.oid_tracker.check(&vb.oid) {
348 self.done = true;
349 return Poll::Ready(Some(Err(e)));
350 }
351
352 self.current_oid = vb.oid.clone();
354 self.count += 1;
355
356 return Poll::Ready(Some(Ok(vb)));
357 }
358
359 if self.pending.is_none() {
361 let client = self.client.clone();
362 let oid = self.current_oid.clone();
363 let max_rep = self.max_repetitions;
364
365 let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
366 self.pending = Some(fut);
367 }
368
369 let pending = self.pending.as_mut().unwrap();
371 match pending.as_mut().poll(cx) {
372 Poll::Pending => return Poll::Pending,
373 Poll::Ready(result) => {
374 self.pending = None;
375
376 match result {
377 Ok(varbinds) => {
378 if varbinds.is_empty() {
379 self.done = true;
380 return Poll::Ready(None);
381 }
382
383 self.buffer = varbinds;
384 self.buffer_idx = 0;
385 }
387 Err(e) => {
388 self.done = true;
389 return Poll::Ready(Some(Err(e)));
390 }
391 }
392 }
393 }
394 }
395 }
396}
397
398pub enum WalkStream<T: Transport> {
410 GetNext(Walk<T>),
412 GetBulk(BulkWalk<T>),
414}
415
416impl<T: Transport> WalkStream<T> {
417 pub(crate) fn new(
419 client: Client<T>,
420 oid: Oid,
421 version: Version,
422 walk_mode: WalkMode,
423 ordering: OidOrdering,
424 max_results: Option<usize>,
425 max_repetitions: i32,
426 ) -> Result<Self> {
427 let use_bulk = match walk_mode {
428 WalkMode::Auto => version != Version::V1,
429 WalkMode::GetNext => false,
430 WalkMode::GetBulk => {
431 if version == Version::V1 {
432 return Err(Error::GetBulkNotSupportedInV1);
433 }
434 true
435 }
436 };
437
438 Ok(if use_bulk {
439 WalkStream::GetBulk(BulkWalk::new(
440 client,
441 oid,
442 max_repetitions,
443 ordering,
444 max_results,
445 ))
446 } else {
447 WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
448 })
449 }
450}
451
452impl<T: Transport + 'static> WalkStream<T> {
453 pub async fn next(&mut self) -> Option<Result<VarBind>> {
455 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
456 }
457
458 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
460 let mut results = Vec::new();
461 while let Some(result) = self.next().await {
462 results.push(result?);
463 }
464 Ok(results)
465 }
466}
467
468impl<T: Transport + 'static> Stream for WalkStream<T> {
469 type Item = Result<VarBind>;
470
471 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472 match self.get_mut() {
474 WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
475 WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::transport::{MockTransport, ResponseBuilder};
484 use crate::{ClientConfig, Version};
485 use bytes::Bytes;
486 use futures_core::Stream;
487 use std::pin::Pin;
488 use std::task::Context;
489 use std::time::Duration;
490
491 fn mock_client(mock: MockTransport) -> Client<MockTransport> {
492 let config = ClientConfig {
493 version: Version::V2c,
494 community: Bytes::from_static(b"public"),
495 timeout: Duration::from_secs(1),
496 retries: 0,
497 max_oids_per_request: 10,
498 v3_security: None,
499 walk_mode: WalkMode::Auto,
500 oid_ordering: OidOrdering::Strict,
501 max_walk_results: None,
502 max_repetitions: 25,
503 };
504 Client::new(mock, config)
505 }
506
507 async fn collect_walk<T: Transport + 'static>(
508 mut walk: Pin<&mut Walk<T>>,
509 limit: usize,
510 ) -> Vec<Result<VarBind>> {
511 use std::future::poll_fn;
512
513 let mut results = Vec::new();
514 while results.len() < limit {
515 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
516 match item {
517 Some(result) => results.push(result),
518 None => break,
519 }
520 }
521 results
522 }
523
524 async fn collect_bulk_walk<T: Transport + 'static>(
525 mut walk: Pin<&mut BulkWalk<T>>,
526 limit: usize,
527 ) -> Vec<Result<VarBind>> {
528 use std::future::poll_fn;
529
530 let mut results = Vec::new();
531 while results.len() < limit {
532 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
533 match item {
534 Some(result) => results.push(result),
535 None => break,
536 }
537 }
538 results
539 }
540
541 #[tokio::test]
542 async fn test_walk_terminates_on_end_of_mib_view() {
543 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
544
545 mock.queue_response(
547 ResponseBuilder::new(1)
548 .varbind(
549 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
550 Value::OctetString("test".into()),
551 )
552 .build_v2c(b"public"),
553 );
554
555 mock.queue_response(
557 ResponseBuilder::new(2)
558 .varbind(
559 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
560 Value::EndOfMibView,
561 )
562 .build_v2c(b"public"),
563 );
564
565 let client = mock_client(mock);
566 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
567
568 let mut pinned = Box::pin(walk);
569 let results = collect_walk(pinned.as_mut(), 10).await;
570
571 assert_eq!(results.len(), 1);
572 assert!(results[0].is_ok());
573 }
574
575 #[tokio::test]
576 async fn test_walk_terminates_when_leaving_subtree() {
577 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
578
579 mock.queue_response(
581 ResponseBuilder::new(1)
582 .varbind(
583 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
585 )
586 .build_v2c(b"public"),
587 );
588
589 let client = mock_client(mock);
590 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); let mut pinned = Box::pin(walk);
593 let results = collect_walk(pinned.as_mut(), 10).await;
594
595 assert_eq!(results.len(), 0);
597 }
598
599 #[tokio::test]
600 async fn test_walk_returns_oids_in_sequence() {
601 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
602
603 mock.queue_response(
605 ResponseBuilder::new(1)
606 .varbind(
607 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
608 Value::OctetString("desc".into()),
609 )
610 .build_v2c(b"public"),
611 );
612 mock.queue_response(
613 ResponseBuilder::new(2)
614 .varbind(
615 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
616 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
617 )
618 .build_v2c(b"public"),
619 );
620 mock.queue_response(
621 ResponseBuilder::new(3)
622 .varbind(
623 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
624 Value::TimeTicks(12345),
625 )
626 .build_v2c(b"public"),
627 );
628 mock.queue_response(
630 ResponseBuilder::new(4)
631 .varbind(
632 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
633 Value::Integer(1),
634 )
635 .build_v2c(b"public"),
636 );
637
638 let client = mock_client(mock);
639 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
640
641 let mut pinned = Box::pin(walk);
642 let results = collect_walk(pinned.as_mut(), 10).await;
643
644 assert_eq!(results.len(), 3);
645
646 let oids: Vec<_> = results
648 .iter()
649 .filter_map(|r| r.as_ref().ok())
650 .map(|vb| &vb.oid)
651 .collect();
652 for i in 1..oids.len() {
653 assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
654 }
655 }
656
657 #[tokio::test]
658 async fn test_walk_propagates_errors() {
659 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
660
661 mock.queue_response(
663 ResponseBuilder::new(1)
664 .varbind(
665 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
666 Value::OctetString("test".into()),
667 )
668 .build_v2c(b"public"),
669 );
670
671 mock.queue_timeout();
673
674 let client = mock_client(mock);
675 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
676
677 let mut pinned = Box::pin(walk);
678 let results = collect_walk(pinned.as_mut(), 10).await;
679
680 assert_eq!(results.len(), 2);
681 assert!(results[0].is_ok());
682 assert!(results[1].is_err());
683 }
684
685 #[tokio::test]
686 async fn test_bulk_walk_terminates_on_end_of_mib_view() {
687 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
688
689 mock.queue_response(
691 ResponseBuilder::new(1)
692 .varbind(
693 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
694 Value::OctetString("desc".into()),
695 )
696 .varbind(
697 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
698 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
699 )
700 .varbind(
701 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
702 Value::EndOfMibView,
703 )
704 .build_v2c(b"public"),
705 );
706
707 let client = mock_client(mock);
708 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
709
710 let mut pinned = Box::pin(walk);
711 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
712
713 assert_eq!(results.len(), 2);
715 assert!(results.iter().all(|r| r.is_ok()));
716 }
717
718 #[tokio::test]
719 async fn test_bulk_walk_terminates_when_leaving_subtree() {
720 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
721
722 mock.queue_response(
724 ResponseBuilder::new(1)
725 .varbind(
726 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
727 Value::OctetString("desc".into()),
728 )
729 .varbind(
730 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
731 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
732 )
733 .varbind(
734 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
736 )
737 .build_v2c(b"public"),
738 );
739
740 let client = mock_client(mock);
741 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
742
743 let mut pinned = Box::pin(walk);
744 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
745
746 assert_eq!(results.len(), 2);
748 }
749
750 #[tokio::test]
751 async fn test_bulk_walk_handles_empty_response() {
752 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
753
754 mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
756
757 let client = mock_client(mock);
758 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
759
760 let mut pinned = Box::pin(walk);
761 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
762
763 assert_eq!(results.len(), 0);
765 }
766
767 #[tokio::test]
771 async fn test_walk_errors_on_decreasing_oid() {
772 use crate::error::Error;
773
774 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
775
776 mock.queue_response(
778 ResponseBuilder::new(1)
779 .varbind(
780 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
781 Value::OctetString("host1".into()),
782 )
783 .build_v2c(b"public"),
784 );
785
786 mock.queue_response(
788 ResponseBuilder::new(2)
789 .varbind(
790 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
791 Value::OctetString("admin".into()),
792 )
793 .build_v2c(b"public"),
794 );
795
796 let client = mock_client(mock);
797 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
798
799 let mut pinned = Box::pin(walk);
800 let results = collect_walk(pinned.as_mut(), 10).await;
801
802 assert_eq!(results.len(), 2);
804 assert!(results[0].is_ok());
805 assert!(matches!(
806 &results[1],
807 Err(Error::NonIncreasingOid { previous, current })
808 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
809 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
810 ));
811 }
812
813 #[tokio::test]
814 async fn test_walk_errors_on_same_oid_returned_twice() {
815 use crate::error::Error;
816
817 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
818
819 mock.queue_response(
821 ResponseBuilder::new(1)
822 .varbind(
823 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
824 Value::OctetString("desc".into()),
825 )
826 .build_v2c(b"public"),
827 );
828
829 mock.queue_response(
831 ResponseBuilder::new(2)
832 .varbind(
833 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
834 Value::OctetString("desc".into()),
835 )
836 .build_v2c(b"public"),
837 );
838
839 let client = mock_client(mock);
840 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
841
842 let mut pinned = Box::pin(walk);
843 let results = collect_walk(pinned.as_mut(), 10).await;
844
845 assert_eq!(results.len(), 2);
847 assert!(results[0].is_ok());
848 assert!(matches!(
849 &results[1],
850 Err(Error::NonIncreasingOid { previous, current })
851 if previous == current
852 ));
853 }
854
855 #[tokio::test]
856 async fn test_bulk_walk_errors_on_non_increasing_oid() {
857 use crate::error::Error;
858
859 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
860
861 mock.queue_response(
863 ResponseBuilder::new(1)
864 .varbind(
865 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
866 Value::OctetString("desc".into()),
867 )
868 .varbind(
869 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
870 Value::TimeTicks(12345),
871 )
872 .varbind(
873 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
875 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
876 )
877 .build_v2c(b"public"),
878 );
879
880 let client = mock_client(mock);
881 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
882
883 let mut pinned = Box::pin(walk);
884 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
885
886 assert_eq!(results.len(), 3);
888 assert!(results[0].is_ok());
889 assert!(results[1].is_ok());
890 assert!(matches!(
891 &results[2],
892 Err(Error::NonIncreasingOid { previous, current })
893 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
894 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
895 ));
896 }
897
898 #[tokio::test]
901 async fn test_walk_allow_non_increasing_accepts_out_of_order() {
902 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
903
904 mock.queue_response(
906 ResponseBuilder::new(1)
907 .varbind(
908 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
909 Value::OctetString("five".into()),
910 )
911 .build_v2c(b"public"),
912 );
913 mock.queue_response(
914 ResponseBuilder::new(2)
915 .varbind(
916 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), Value::OctetString("three".into()),
918 )
919 .build_v2c(b"public"),
920 );
921 mock.queue_response(
922 ResponseBuilder::new(3)
923 .varbind(
924 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
925 Value::OctetString("seven".into()),
926 )
927 .build_v2c(b"public"),
928 );
929 mock.queue_response(
931 ResponseBuilder::new(4)
932 .varbind(
933 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
934 Value::Integer(1),
935 )
936 .build_v2c(b"public"),
937 );
938
939 let client = mock_client(mock);
940 let walk = Walk::new(
941 client,
942 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
943 OidOrdering::AllowNonIncreasing,
944 None,
945 );
946
947 let mut pinned = Box::pin(walk);
948 let results = collect_walk(pinned.as_mut(), 10).await;
949
950 assert_eq!(results.len(), 3);
952 assert!(results.iter().all(|r| r.is_ok()));
953 }
954
955 #[tokio::test]
956 async fn test_walk_allow_non_increasing_detects_cycle() {
957 use crate::error::Error;
958
959 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
960
961 mock.queue_response(
963 ResponseBuilder::new(1)
964 .varbind(
965 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
966 Value::OctetString("first".into()),
967 )
968 .build_v2c(b"public"),
969 );
970 mock.queue_response(
972 ResponseBuilder::new(2)
973 .varbind(
974 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
975 Value::OctetString("second".into()),
976 )
977 .build_v2c(b"public"),
978 );
979 mock.queue_response(
981 ResponseBuilder::new(3)
982 .varbind(
983 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
984 Value::OctetString("first-again".into()),
985 )
986 .build_v2c(b"public"),
987 );
988
989 let client = mock_client(mock);
990 let walk = Walk::new(
991 client,
992 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
993 OidOrdering::AllowNonIncreasing,
994 None,
995 );
996
997 let mut pinned = Box::pin(walk);
998 let results = collect_walk(pinned.as_mut(), 10).await;
999
1000 assert_eq!(results.len(), 3);
1002 assert!(results[0].is_ok());
1003 assert!(results[1].is_ok());
1004 assert!(matches!(
1005 &results[2],
1006 Err(Error::DuplicateOid { oid })
1007 if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
1008 ));
1009 }
1010
1011 #[tokio::test]
1014 async fn test_walk_respects_max_results() {
1015 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1016
1017 for i in 1..=10 {
1019 mock.queue_response(
1020 ResponseBuilder::new(i)
1021 .varbind(
1022 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
1023 Value::Integer(i),
1024 )
1025 .build_v2c(b"public"),
1026 );
1027 }
1028
1029 let client = mock_client(mock);
1030 let walk = Walk::new(
1031 client,
1032 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1033 OidOrdering::Strict,
1034 Some(3), );
1036
1037 let mut pinned = Box::pin(walk);
1038 let results = collect_walk(pinned.as_mut(), 20).await;
1039
1040 assert_eq!(results.len(), 3);
1042 assert!(results.iter().all(|r| r.is_ok()));
1043 }
1044
1045 #[tokio::test]
1046 async fn test_bulk_walk_respects_max_results() {
1047 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1048
1049 mock.queue_response(
1051 ResponseBuilder::new(1)
1052 .varbind(
1053 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1054 Value::Integer(1),
1055 )
1056 .varbind(
1057 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1058 Value::Integer(2),
1059 )
1060 .varbind(
1061 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1062 Value::Integer(3),
1063 )
1064 .varbind(
1065 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1066 Value::Integer(4),
1067 )
1068 .varbind(
1069 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1070 Value::Integer(5),
1071 )
1072 .build_v2c(b"public"),
1073 );
1074
1075 let client = mock_client(mock);
1076 let walk = BulkWalk::new(
1077 client,
1078 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1079 10,
1080 OidOrdering::Strict,
1081 Some(3), );
1083
1084 let mut pinned = Box::pin(walk);
1085 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1086
1087 assert_eq!(results.len(), 3);
1089 assert!(results.iter().all(|r| r.is_ok()));
1090 }
1091
1092 #[tokio::test]
1095 async fn test_walk_inherent_next() {
1096 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1097
1098 mock.queue_response(
1099 ResponseBuilder::new(1)
1100 .varbind(
1101 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1102 Value::OctetString("test".into()),
1103 )
1104 .build_v2c(b"public"),
1105 );
1106 mock.queue_response(
1107 ResponseBuilder::new(2)
1108 .varbind(
1109 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1110 Value::Integer(42),
1111 )
1112 .build_v2c(b"public"),
1113 );
1114 mock.queue_response(
1115 ResponseBuilder::new(3)
1116 .varbind(
1117 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1119 )
1120 .build_v2c(b"public"),
1121 );
1122
1123 let client = mock_client(mock);
1124 let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1125
1126 let first = walk.next().await;
1128 assert!(first.is_some());
1129 assert!(first.unwrap().is_ok());
1130
1131 let second = walk.next().await;
1132 assert!(second.is_some());
1133 assert!(second.unwrap().is_ok());
1134
1135 let third = walk.next().await;
1136 assert!(third.is_none()); }
1138
1139 #[tokio::test]
1140 async fn test_walk_inherent_collect() {
1141 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1142
1143 mock.queue_response(
1144 ResponseBuilder::new(1)
1145 .varbind(
1146 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1147 Value::OctetString("test".into()),
1148 )
1149 .build_v2c(b"public"),
1150 );
1151 mock.queue_response(
1152 ResponseBuilder::new(2)
1153 .varbind(
1154 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1155 Value::Integer(42),
1156 )
1157 .build_v2c(b"public"),
1158 );
1159 mock.queue_response(
1160 ResponseBuilder::new(3)
1161 .varbind(
1162 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1164 )
1165 .build_v2c(b"public"),
1166 );
1167
1168 let client = mock_client(mock);
1169 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1170
1171 let results = walk.collect().await.unwrap();
1173 assert_eq!(results.len(), 2);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_bulk_walk_inherent_collect() {
1178 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1179
1180 mock.queue_response(
1181 ResponseBuilder::new(1)
1182 .varbind(
1183 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1184 Value::OctetString("desc".into()),
1185 )
1186 .varbind(
1187 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1188 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1189 )
1190 .varbind(
1191 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1193 )
1194 .build_v2c(b"public"),
1195 );
1196
1197 let client = mock_client(mock);
1198 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1199
1200 let results = walk.collect().await.unwrap();
1202 assert_eq!(results.len(), 2);
1203 }
1204}