1#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24 #[default]
27 Auto,
28 GetNext,
30 GetBulk,
32}
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub enum OidOrdering {
53 #[default]
58 Strict,
59
60 AllowNonIncreasing,
74}
75
76enum OidTracker {
83 Strict { last: Option<Oid> },
86
87 Relaxed { seen: HashSet<Oid> },
90}
91
92impl OidTracker {
93 fn new(ordering: OidOrdering) -> Self {
94 match ordering {
95 OidOrdering::Strict => OidTracker::Strict { last: None },
96 OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
97 seen: HashSet::new(),
98 },
99 }
100 }
101
102 fn check(&mut self, oid: &Oid) -> Result<()> {
105 match self {
106 OidTracker::Strict { last } => {
107 if let Some(prev) = last
108 && oid <= prev
109 {
110 return Err(Error::NonIncreasingOid {
111 previous: prev.clone(),
112 current: oid.clone(),
113 });
114 }
115 *last = Some(oid.clone());
116 Ok(())
117 }
118 OidTracker::Relaxed { seen } => {
119 if !seen.insert(oid.clone()) {
120 return Err(Error::DuplicateOid { oid: oid.clone() });
121 }
122 Ok(())
123 }
124 }
125 }
126}
127
128pub struct Walk<T: Transport> {
132 client: Client<T>,
133 base_oid: Oid,
134 current_oid: Oid,
135 oid_tracker: OidTracker,
137 max_results: Option<usize>,
139 count: usize,
141 done: bool,
142 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
143}
144
145impl<T: Transport> Walk<T> {
146 pub(crate) fn new(
147 client: Client<T>,
148 oid: Oid,
149 ordering: OidOrdering,
150 max_results: Option<usize>,
151 ) -> Self {
152 Self {
153 client,
154 base_oid: oid.clone(),
155 current_oid: oid,
156 oid_tracker: OidTracker::new(ordering),
157 max_results,
158 count: 0,
159 done: false,
160 pending: None,
161 }
162 }
163}
164
165impl<T: Transport + 'static> Walk<T> {
166 pub async fn next(&mut self) -> Option<Result<VarBind>> {
168 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
169 }
170
171 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
173 let mut results = Vec::new();
174 while let Some(result) = self.next().await {
175 results.push(result?);
176 }
177 Ok(results)
178 }
179}
180
181impl<T: Transport + 'static> Stream for Walk<T> {
182 type Item = Result<VarBind>;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 if self.done {
186 return Poll::Ready(None);
187 }
188
189 if let Some(max) = self.max_results
191 && self.count >= max
192 {
193 self.done = true;
194 return Poll::Ready(None);
195 }
196
197 if self.pending.is_none() {
199 let client = self.client.clone();
201 let oid = self.current_oid.clone();
202
203 let fut = Box::pin(async move { client.get_next(&oid).await });
204 self.pending = Some(fut);
205 }
206
207 let pending = self.pending.as_mut().unwrap();
209 match pending.as_mut().poll(cx) {
210 Poll::Pending => Poll::Pending,
211 Poll::Ready(result) => {
212 self.pending = None;
213
214 match result {
215 Ok(vb) => {
216 if matches!(vb.value, Value::EndOfMibView) {
218 self.done = true;
219 return Poll::Ready(None);
220 }
221
222 if !vb.oid.starts_with(&self.base_oid) {
224 self.done = true;
225 return Poll::Ready(None);
226 }
227
228 if let Err(e) = self.oid_tracker.check(&vb.oid) {
230 self.done = true;
231 return Poll::Ready(Some(Err(e)));
232 }
233
234 self.current_oid = vb.oid.clone();
236 self.count += 1;
237
238 Poll::Ready(Some(Ok(vb)))
239 }
240 Err(e) => {
241 self.done = true;
242 Poll::Ready(Some(Err(e)))
243 }
244 }
245 }
246 }
247 }
248}
249
250pub struct BulkWalk<T: Transport> {
254 client: Client<T>,
255 base_oid: Oid,
256 current_oid: Oid,
257 max_repetitions: i32,
258 oid_tracker: OidTracker,
260 max_results: Option<usize>,
262 count: usize,
264 done: bool,
265 buffer: Vec<VarBind>,
267 buffer_idx: usize,
269 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
270}
271
272impl<T: Transport> BulkWalk<T> {
273 pub(crate) fn new(
274 client: Client<T>,
275 oid: Oid,
276 max_repetitions: i32,
277 ordering: OidOrdering,
278 max_results: Option<usize>,
279 ) -> Self {
280 Self {
281 client,
282 base_oid: oid.clone(),
283 current_oid: oid,
284 max_repetitions,
285 oid_tracker: OidTracker::new(ordering),
286 max_results,
287 count: 0,
288 done: false,
289 buffer: Vec::new(),
290 buffer_idx: 0,
291 pending: None,
292 }
293 }
294}
295
296impl<T: Transport + 'static> BulkWalk<T> {
297 pub async fn next(&mut self) -> Option<Result<VarBind>> {
299 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
300 }
301
302 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
304 let mut results = Vec::new();
305 while let Some(result) = self.next().await {
306 results.push(result?);
307 }
308 Ok(results)
309 }
310}
311
312impl<T: Transport + 'static> Stream for BulkWalk<T> {
313 type Item = Result<VarBind>;
314
315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 loop {
317 if self.done {
318 return Poll::Ready(None);
319 }
320
321 if let Some(max) = self.max_results
323 && self.count >= max
324 {
325 self.done = true;
326 return Poll::Ready(None);
327 }
328
329 if self.buffer_idx < self.buffer.len() {
331 let vb = self.buffer[self.buffer_idx].clone();
332 self.buffer_idx += 1;
333
334 if matches!(vb.value, Value::EndOfMibView) {
336 self.done = true;
337 return Poll::Ready(None);
338 }
339
340 if !vb.oid.starts_with(&self.base_oid) {
342 self.done = true;
343 return Poll::Ready(None);
344 }
345
346 if let Err(e) = self.oid_tracker.check(&vb.oid) {
348 self.done = true;
349 return Poll::Ready(Some(Err(e)));
350 }
351
352 self.current_oid = vb.oid.clone();
354 self.count += 1;
355
356 return Poll::Ready(Some(Ok(vb)));
357 }
358
359 if self.pending.is_none() {
361 let client = self.client.clone();
362 let oid = self.current_oid.clone();
363 let max_rep = self.max_repetitions;
364
365 let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
366 self.pending = Some(fut);
367 }
368
369 let pending = self.pending.as_mut().unwrap();
371 match pending.as_mut().poll(cx) {
372 Poll::Pending => return Poll::Pending,
373 Poll::Ready(result) => {
374 self.pending = None;
375
376 match result {
377 Ok(varbinds) => {
378 if varbinds.is_empty() {
379 self.done = true;
380 return Poll::Ready(None);
381 }
382
383 self.buffer = varbinds;
384 self.buffer_idx = 0;
385 }
387 Err(e) => {
388 self.done = true;
389 return Poll::Ready(Some(Err(e)));
390 }
391 }
392 }
393 }
394 }
395 }
396}
397
398pub enum WalkStream<T: Transport> {
410 GetNext(Walk<T>),
412 GetBulk(BulkWalk<T>),
414}
415
416impl<T: Transport> WalkStream<T> {
417 pub(crate) fn new(
419 client: Client<T>,
420 oid: Oid,
421 version: Version,
422 walk_mode: WalkMode,
423 ordering: OidOrdering,
424 max_results: Option<usize>,
425 max_repetitions: i32,
426 ) -> Result<Self> {
427 let use_bulk = match walk_mode {
428 WalkMode::Auto => version != Version::V1,
429 WalkMode::GetNext => false,
430 WalkMode::GetBulk => {
431 if version == Version::V1 {
432 return Err(Error::GetBulkNotSupportedInV1);
433 }
434 true
435 }
436 };
437
438 Ok(if use_bulk {
439 WalkStream::GetBulk(BulkWalk::new(
440 client,
441 oid,
442 max_repetitions,
443 ordering,
444 max_results,
445 ))
446 } else {
447 WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
448 })
449 }
450}
451
452impl<T: Transport + 'static> WalkStream<T> {
453 pub async fn next(&mut self) -> Option<Result<VarBind>> {
455 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
456 }
457
458 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
460 let mut results = Vec::new();
461 while let Some(result) = self.next().await {
462 results.push(result?);
463 }
464 Ok(results)
465 }
466}
467
468impl<T: Transport + 'static> Stream for WalkStream<T> {
469 type Item = Result<VarBind>;
470
471 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472 match self.get_mut() {
474 WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
475 WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::client::retry::Retry;
484 use crate::transport::{MockTransport, ResponseBuilder};
485 use crate::{ClientConfig, Version};
486 use bytes::Bytes;
487 use futures_core::Stream;
488 use std::pin::Pin;
489 use std::task::Context;
490 use std::time::Duration;
491
492 fn mock_client(mock: MockTransport) -> Client<MockTransport> {
493 let config = ClientConfig {
494 version: Version::V2c,
495 community: Bytes::from_static(b"public"),
496 timeout: Duration::from_secs(1),
497 retry: Retry::none(),
498 max_oids_per_request: 10,
499 v3_security: None,
500 walk_mode: WalkMode::Auto,
501 oid_ordering: OidOrdering::Strict,
502 max_walk_results: None,
503 max_repetitions: 25,
504 };
505 Client::new(mock, config)
506 }
507
508 async fn collect_walk<T: Transport + 'static>(
509 mut walk: Pin<&mut Walk<T>>,
510 limit: usize,
511 ) -> Vec<Result<VarBind>> {
512 use std::future::poll_fn;
513
514 let mut results = Vec::new();
515 while results.len() < limit {
516 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
517 match item {
518 Some(result) => results.push(result),
519 None => break,
520 }
521 }
522 results
523 }
524
525 async fn collect_bulk_walk<T: Transport + 'static>(
526 mut walk: Pin<&mut BulkWalk<T>>,
527 limit: usize,
528 ) -> Vec<Result<VarBind>> {
529 use std::future::poll_fn;
530
531 let mut results = Vec::new();
532 while results.len() < limit {
533 let item = poll_fn(|cx: &mut Context<'_>| walk.as_mut().poll_next(cx)).await;
534 match item {
535 Some(result) => results.push(result),
536 None => break,
537 }
538 }
539 results
540 }
541
542 #[tokio::test]
543 async fn test_walk_terminates_on_end_of_mib_view() {
544 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
545
546 mock.queue_response(
548 ResponseBuilder::new(1)
549 .varbind(
550 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
551 Value::OctetString("test".into()),
552 )
553 .build_v2c(b"public"),
554 );
555
556 mock.queue_response(
558 ResponseBuilder::new(2)
559 .varbind(
560 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
561 Value::EndOfMibView,
562 )
563 .build_v2c(b"public"),
564 );
565
566 let client = mock_client(mock);
567 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
568
569 let mut pinned = Box::pin(walk);
570 let results = collect_walk(pinned.as_mut(), 10).await;
571
572 assert_eq!(results.len(), 1);
573 assert!(results[0].is_ok());
574 }
575
576 #[tokio::test]
577 async fn test_walk_terminates_when_leaving_subtree() {
578 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
579
580 mock.queue_response(
582 ResponseBuilder::new(1)
583 .varbind(
584 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
586 )
587 .build_v2c(b"public"),
588 );
589
590 let client = mock_client(mock);
591 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1])); let mut pinned = Box::pin(walk);
594 let results = collect_walk(pinned.as_mut(), 10).await;
595
596 assert_eq!(results.len(), 0);
598 }
599
600 #[tokio::test]
601 async fn test_walk_returns_oids_in_sequence() {
602 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
603
604 mock.queue_response(
606 ResponseBuilder::new(1)
607 .varbind(
608 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
609 Value::OctetString("desc".into()),
610 )
611 .build_v2c(b"public"),
612 );
613 mock.queue_response(
614 ResponseBuilder::new(2)
615 .varbind(
616 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
617 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
618 )
619 .build_v2c(b"public"),
620 );
621 mock.queue_response(
622 ResponseBuilder::new(3)
623 .varbind(
624 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
625 Value::TimeTicks(12345),
626 )
627 .build_v2c(b"public"),
628 );
629 mock.queue_response(
631 ResponseBuilder::new(4)
632 .varbind(
633 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
634 Value::Integer(1),
635 )
636 .build_v2c(b"public"),
637 );
638
639 let client = mock_client(mock);
640 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
641
642 let mut pinned = Box::pin(walk);
643 let results = collect_walk(pinned.as_mut(), 10).await;
644
645 assert_eq!(results.len(), 3);
646
647 let oids: Vec<_> = results
649 .iter()
650 .filter_map(|r| r.as_ref().ok())
651 .map(|vb| &vb.oid)
652 .collect();
653 for i in 1..oids.len() {
654 assert!(oids[i] > oids[i - 1], "OIDs should be strictly increasing");
655 }
656 }
657
658 #[tokio::test]
659 async fn test_walk_propagates_errors() {
660 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
661
662 mock.queue_response(
664 ResponseBuilder::new(1)
665 .varbind(
666 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
667 Value::OctetString("test".into()),
668 )
669 .build_v2c(b"public"),
670 );
671
672 mock.queue_timeout();
674
675 let client = mock_client(mock);
676 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
677
678 let mut pinned = Box::pin(walk);
679 let results = collect_walk(pinned.as_mut(), 10).await;
680
681 assert_eq!(results.len(), 2);
682 assert!(results[0].is_ok());
683 assert!(results[1].is_err());
684 }
685
686 #[tokio::test]
687 async fn test_bulk_walk_terminates_on_end_of_mib_view() {
688 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
689
690 mock.queue_response(
692 ResponseBuilder::new(1)
693 .varbind(
694 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
695 Value::OctetString("desc".into()),
696 )
697 .varbind(
698 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
699 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
700 )
701 .varbind(
702 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
703 Value::EndOfMibView,
704 )
705 .build_v2c(b"public"),
706 );
707
708 let client = mock_client(mock);
709 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
710
711 let mut pinned = Box::pin(walk);
712 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
713
714 assert_eq!(results.len(), 2);
716 assert!(results.iter().all(|r| r.is_ok()));
717 }
718
719 #[tokio::test]
720 async fn test_bulk_walk_terminates_when_leaving_subtree() {
721 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
722
723 mock.queue_response(
725 ResponseBuilder::new(1)
726 .varbind(
727 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
728 Value::OctetString("desc".into()),
729 )
730 .varbind(
731 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
732 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
733 )
734 .varbind(
735 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
737 )
738 .build_v2c(b"public"),
739 );
740
741 let client = mock_client(mock);
742 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
743
744 let mut pinned = Box::pin(walk);
745 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
746
747 assert_eq!(results.len(), 2);
749 }
750
751 #[tokio::test]
752 async fn test_bulk_walk_handles_empty_response() {
753 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
754
755 mock.queue_response(ResponseBuilder::new(1).build_v2c(b"public"));
757
758 let client = mock_client(mock);
759 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
760
761 let mut pinned = Box::pin(walk);
762 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
763
764 assert_eq!(results.len(), 0);
766 }
767
768 #[tokio::test]
772 async fn test_walk_errors_on_decreasing_oid() {
773 use crate::error::Error;
774
775 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
776
777 mock.queue_response(
779 ResponseBuilder::new(1)
780 .varbind(
781 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
782 Value::OctetString("host1".into()),
783 )
784 .build_v2c(b"public"),
785 );
786
787 mock.queue_response(
789 ResponseBuilder::new(2)
790 .varbind(
791 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
792 Value::OctetString("admin".into()),
793 )
794 .build_v2c(b"public"),
795 );
796
797 let client = mock_client(mock);
798 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
799
800 let mut pinned = Box::pin(walk);
801 let results = collect_walk(pinned.as_mut(), 10).await;
802
803 assert_eq!(results.len(), 2);
805 assert!(results[0].is_ok());
806 assert!(matches!(
807 &results[1],
808 Err(Error::NonIncreasingOid { previous, current })
809 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0])
810 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0])
811 ));
812 }
813
814 #[tokio::test]
815 async fn test_walk_errors_on_same_oid_returned_twice() {
816 use crate::error::Error;
817
818 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
819
820 mock.queue_response(
822 ResponseBuilder::new(1)
823 .varbind(
824 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
825 Value::OctetString("desc".into()),
826 )
827 .build_v2c(b"public"),
828 );
829
830 mock.queue_response(
832 ResponseBuilder::new(2)
833 .varbind(
834 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
835 Value::OctetString("desc".into()),
836 )
837 .build_v2c(b"public"),
838 );
839
840 let client = mock_client(mock);
841 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
842
843 let mut pinned = Box::pin(walk);
844 let results = collect_walk(pinned.as_mut(), 10).await;
845
846 assert_eq!(results.len(), 2);
848 assert!(results[0].is_ok());
849 assert!(matches!(
850 &results[1],
851 Err(Error::NonIncreasingOid { previous, current })
852 if previous == current
853 ));
854 }
855
856 #[tokio::test]
857 async fn test_bulk_walk_errors_on_non_increasing_oid() {
858 use crate::error::Error;
859
860 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
861
862 mock.queue_response(
864 ResponseBuilder::new(1)
865 .varbind(
866 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
867 Value::OctetString("desc".into()),
868 )
869 .varbind(
870 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
871 Value::TimeTicks(12345),
872 )
873 .varbind(
874 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
876 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
877 )
878 .build_v2c(b"public"),
879 );
880
881 let client = mock_client(mock);
882 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
883
884 let mut pinned = Box::pin(walk);
885 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
886
887 assert_eq!(results.len(), 3);
889 assert!(results[0].is_ok());
890 assert!(results[1].is_ok());
891 assert!(matches!(
892 &results[2],
893 Err(Error::NonIncreasingOid { previous, current })
894 if previous == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0])
895 && current == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0])
896 ));
897 }
898
899 #[tokio::test]
902 async fn test_walk_allow_non_increasing_accepts_out_of_order() {
903 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
904
905 mock.queue_response(
907 ResponseBuilder::new(1)
908 .varbind(
909 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
910 Value::OctetString("five".into()),
911 )
912 .build_v2c(b"public"),
913 );
914 mock.queue_response(
915 ResponseBuilder::new(2)
916 .varbind(
917 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]), Value::OctetString("three".into()),
919 )
920 .build_v2c(b"public"),
921 );
922 mock.queue_response(
923 ResponseBuilder::new(3)
924 .varbind(
925 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 7, 0]),
926 Value::OctetString("seven".into()),
927 )
928 .build_v2c(b"public"),
929 );
930 mock.queue_response(
932 ResponseBuilder::new(4)
933 .varbind(
934 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]),
935 Value::Integer(1),
936 )
937 .build_v2c(b"public"),
938 );
939
940 let client = mock_client(mock);
941 let walk = Walk::new(
942 client,
943 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
944 OidOrdering::AllowNonIncreasing,
945 None,
946 );
947
948 let mut pinned = Box::pin(walk);
949 let results = collect_walk(pinned.as_mut(), 10).await;
950
951 assert_eq!(results.len(), 3);
953 assert!(results.iter().all(|r| r.is_ok()));
954 }
955
956 #[tokio::test]
957 async fn test_walk_allow_non_increasing_detects_cycle() {
958 use crate::error::Error;
959
960 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
961
962 mock.queue_response(
964 ResponseBuilder::new(1)
965 .varbind(
966 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
967 Value::OctetString("first".into()),
968 )
969 .build_v2c(b"public"),
970 );
971 mock.queue_response(
973 ResponseBuilder::new(2)
974 .varbind(
975 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
976 Value::OctetString("second".into()),
977 )
978 .build_v2c(b"public"),
979 );
980 mock.queue_response(
982 ResponseBuilder::new(3)
983 .varbind(
984 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
985 Value::OctetString("first-again".into()),
986 )
987 .build_v2c(b"public"),
988 );
989
990 let client = mock_client(mock);
991 let walk = Walk::new(
992 client,
993 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
994 OidOrdering::AllowNonIncreasing,
995 None,
996 );
997
998 let mut pinned = Box::pin(walk);
999 let results = collect_walk(pinned.as_mut(), 10).await;
1000
1001 assert_eq!(results.len(), 3);
1003 assert!(results[0].is_ok());
1004 assert!(results[1].is_ok());
1005 assert!(matches!(
1006 &results[2],
1007 Err(Error::DuplicateOid { oid })
1008 if oid == &Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0])
1009 ));
1010 }
1011
1012 #[tokio::test]
1015 async fn test_walk_respects_max_results() {
1016 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1017
1018 for i in 1..=10 {
1020 mock.queue_response(
1021 ResponseBuilder::new(i)
1022 .varbind(
1023 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, i as u32, 0]),
1024 Value::Integer(i),
1025 )
1026 .build_v2c(b"public"),
1027 );
1028 }
1029
1030 let client = mock_client(mock);
1031 let walk = Walk::new(
1032 client,
1033 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1034 OidOrdering::Strict,
1035 Some(3), );
1037
1038 let mut pinned = Box::pin(walk);
1039 let results = collect_walk(pinned.as_mut(), 20).await;
1040
1041 assert_eq!(results.len(), 3);
1043 assert!(results.iter().all(|r| r.is_ok()));
1044 }
1045
1046 #[tokio::test]
1047 async fn test_bulk_walk_respects_max_results() {
1048 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1049
1050 mock.queue_response(
1052 ResponseBuilder::new(1)
1053 .varbind(
1054 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1055 Value::Integer(1),
1056 )
1057 .varbind(
1058 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1059 Value::Integer(2),
1060 )
1061 .varbind(
1062 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 3, 0]),
1063 Value::Integer(3),
1064 )
1065 .varbind(
1066 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 4, 0]),
1067 Value::Integer(4),
1068 )
1069 .varbind(
1070 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 5, 0]),
1071 Value::Integer(5),
1072 )
1073 .build_v2c(b"public"),
1074 );
1075
1076 let client = mock_client(mock);
1077 let walk = BulkWalk::new(
1078 client,
1079 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]),
1080 10,
1081 OidOrdering::Strict,
1082 Some(3), );
1084
1085 let mut pinned = Box::pin(walk);
1086 let results = collect_bulk_walk(pinned.as_mut(), 20).await;
1087
1088 assert_eq!(results.len(), 3);
1090 assert!(results.iter().all(|r| r.is_ok()));
1091 }
1092
1093 #[tokio::test]
1096 async fn test_walk_inherent_next() {
1097 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1098
1099 mock.queue_response(
1100 ResponseBuilder::new(1)
1101 .varbind(
1102 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1103 Value::OctetString("test".into()),
1104 )
1105 .build_v2c(b"public"),
1106 );
1107 mock.queue_response(
1108 ResponseBuilder::new(2)
1109 .varbind(
1110 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1111 Value::Integer(42),
1112 )
1113 .build_v2c(b"public"),
1114 );
1115 mock.queue_response(
1116 ResponseBuilder::new(3)
1117 .varbind(
1118 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1120 )
1121 .build_v2c(b"public"),
1122 );
1123
1124 let client = mock_client(mock);
1125 let mut walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1126
1127 let first = walk.next().await;
1129 assert!(first.is_some());
1130 assert!(first.unwrap().is_ok());
1131
1132 let second = walk.next().await;
1133 assert!(second.is_some());
1134 assert!(second.unwrap().is_ok());
1135
1136 let third = walk.next().await;
1137 assert!(third.is_none()); }
1139
1140 #[tokio::test]
1141 async fn test_walk_inherent_collect() {
1142 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1143
1144 mock.queue_response(
1145 ResponseBuilder::new(1)
1146 .varbind(
1147 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1148 Value::OctetString("test".into()),
1149 )
1150 .build_v2c(b"public"),
1151 );
1152 mock.queue_response(
1153 ResponseBuilder::new(2)
1154 .varbind(
1155 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1156 Value::Integer(42),
1157 )
1158 .build_v2c(b"public"),
1159 );
1160 mock.queue_response(
1161 ResponseBuilder::new(3)
1162 .varbind(
1163 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1165 )
1166 .build_v2c(b"public"),
1167 );
1168
1169 let client = mock_client(mock);
1170 let walk = client.walk_getnext(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]));
1171
1172 let results = walk.collect().await.unwrap();
1174 assert_eq!(results.len(), 2);
1175 }
1176
1177 #[tokio::test]
1178 async fn test_bulk_walk_inherent_collect() {
1179 let mut mock = MockTransport::new("127.0.0.1:161".parse().unwrap());
1180
1181 mock.queue_response(
1182 ResponseBuilder::new(1)
1183 .varbind(
1184 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 1, 0]),
1185 Value::OctetString("desc".into()),
1186 )
1187 .varbind(
1188 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1, 2, 0]),
1189 Value::ObjectIdentifier(Oid::from_slice(&[1, 3, 6, 1, 4, 1, 99])),
1190 )
1191 .varbind(
1192 Oid::from_slice(&[1, 3, 6, 1, 2, 1, 2, 1, 0]), Value::Integer(1),
1194 )
1195 .build_v2c(b"public"),
1196 );
1197
1198 let client = mock_client(mock);
1199 let walk = client.bulk_walk(Oid::from_slice(&[1, 3, 6, 1, 2, 1, 1]), 10);
1200
1201 let results = walk.collect().await.unwrap();
1203 assert_eq!(results.len(), 2);
1204 }
1205}