1#![allow(clippy::type_complexity)]
4
5use std::collections::HashSet;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::Stream;
10
11use crate::error::{Error, Result};
12use crate::oid::Oid;
13use crate::transport::Transport;
14use crate::value::Value;
15use crate::varbind::VarBind;
16use crate::version::Version;
17
18use super::Client;
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub enum WalkMode {
24 #[default]
27 Auto,
28 GetNext,
30 GetBulk,
32}
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52pub enum OidOrdering {
53 #[default]
58 Strict,
59
60 AllowNonIncreasing,
74}
75
76enum OidTracker {
77 Strict { last: Option<Oid> },
78 Relaxed { seen: HashSet<Oid> },
79}
80
81impl OidTracker {
82 fn new(ordering: OidOrdering) -> Self {
83 match ordering {
84 OidOrdering::Strict => OidTracker::Strict { last: None },
85 OidOrdering::AllowNonIncreasing => OidTracker::Relaxed {
86 seen: HashSet::new(),
87 },
88 }
89 }
90
91 fn check(&mut self, oid: &Oid) -> Result<()> {
92 match self {
93 OidTracker::Strict { last } => {
94 if let Some(prev) = last
95 && oid <= prev
96 {
97 return Err(Error::NonIncreasingOid {
98 previous: prev.clone(),
99 current: oid.clone(),
100 });
101 }
102 *last = Some(oid.clone());
103 Ok(())
104 }
105 OidTracker::Relaxed { seen } => {
106 if !seen.insert(oid.clone()) {
107 return Err(Error::DuplicateOid { oid: oid.clone() });
108 }
109 Ok(())
110 }
111 }
112 }
113}
114
115pub struct Walk<T: Transport> {
119 client: Client<T>,
120 base_oid: Oid,
121 current_oid: Oid,
122 oid_tracker: OidTracker,
124 max_results: Option<usize>,
126 count: usize,
128 done: bool,
129 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<VarBind>> + Send>>>,
130}
131
132impl<T: Transport> Walk<T> {
133 pub(crate) fn new(
134 client: Client<T>,
135 oid: Oid,
136 ordering: OidOrdering,
137 max_results: Option<usize>,
138 ) -> Self {
139 Self {
140 client,
141 base_oid: oid.clone(),
142 current_oid: oid,
143 oid_tracker: OidTracker::new(ordering),
144 max_results,
145 count: 0,
146 done: false,
147 pending: None,
148 }
149 }
150}
151
152impl<T: Transport + 'static> Walk<T> {
153 pub async fn next(&mut self) -> Option<Result<VarBind>> {
155 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
156 }
157
158 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
160 let mut results = Vec::new();
161 while let Some(result) = self.next().await {
162 results.push(result?);
163 }
164 Ok(results)
165 }
166}
167
168impl<T: Transport + 'static> Stream for Walk<T> {
169 type Item = Result<VarBind>;
170
171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 if self.done {
173 return Poll::Ready(None);
174 }
175
176 if let Some(max) = self.max_results
178 && self.count >= max
179 {
180 self.done = true;
181 return Poll::Ready(None);
182 }
183
184 if self.pending.is_none() {
186 let client = self.client.clone();
188 let oid = self.current_oid.clone();
189
190 let fut = Box::pin(async move { client.get_next(&oid).await });
191 self.pending = Some(fut);
192 }
193
194 let pending = self.pending.as_mut().unwrap();
196 match pending.as_mut().poll(cx) {
197 Poll::Pending => Poll::Pending,
198 Poll::Ready(result) => {
199 self.pending = None;
200
201 match result {
202 Ok(vb) => {
203 if matches!(vb.value, Value::EndOfMibView) {
205 self.done = true;
206 return Poll::Ready(None);
207 }
208
209 if !vb.oid.starts_with(&self.base_oid) {
211 self.done = true;
212 return Poll::Ready(None);
213 }
214
215 if let Err(e) = self.oid_tracker.check(&vb.oid) {
217 self.done = true;
218 return Poll::Ready(Some(Err(e)));
219 }
220
221 self.current_oid = vb.oid.clone();
223 self.count += 1;
224
225 Poll::Ready(Some(Ok(vb)))
226 }
227 Err(e) => {
228 self.done = true;
229 Poll::Ready(Some(Err(e)))
230 }
231 }
232 }
233 }
234 }
235}
236
237pub struct BulkWalk<T: Transport> {
241 client: Client<T>,
242 base_oid: Oid,
243 current_oid: Oid,
244 max_repetitions: i32,
245 oid_tracker: OidTracker,
247 max_results: Option<usize>,
249 count: usize,
251 done: bool,
252 buffer: Vec<VarBind>,
254 buffer_idx: usize,
256 pending: Option<Pin<Box<dyn std::future::Future<Output = Result<Vec<VarBind>>> + Send>>>,
257}
258
259impl<T: Transport> BulkWalk<T> {
260 pub(crate) fn new(
261 client: Client<T>,
262 oid: Oid,
263 max_repetitions: i32,
264 ordering: OidOrdering,
265 max_results: Option<usize>,
266 ) -> Self {
267 Self {
268 client,
269 base_oid: oid.clone(),
270 current_oid: oid,
271 max_repetitions,
272 oid_tracker: OidTracker::new(ordering),
273 max_results,
274 count: 0,
275 done: false,
276 buffer: Vec::new(),
277 buffer_idx: 0,
278 pending: None,
279 }
280 }
281}
282
283impl<T: Transport + 'static> BulkWalk<T> {
284 pub async fn next(&mut self) -> Option<Result<VarBind>> {
286 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
287 }
288
289 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
291 let mut results = Vec::new();
292 while let Some(result) = self.next().await {
293 results.push(result?);
294 }
295 Ok(results)
296 }
297}
298
299impl<T: Transport + 'static> Stream for BulkWalk<T> {
300 type Item = Result<VarBind>;
301
302 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
303 loop {
304 if self.done {
305 return Poll::Ready(None);
306 }
307
308 if let Some(max) = self.max_results
310 && self.count >= max
311 {
312 self.done = true;
313 return Poll::Ready(None);
314 }
315
316 if self.buffer_idx < self.buffer.len() {
318 let vb = self.buffer[self.buffer_idx].clone();
319 self.buffer_idx += 1;
320
321 if matches!(vb.value, Value::EndOfMibView) {
323 self.done = true;
324 return Poll::Ready(None);
325 }
326
327 if !vb.oid.starts_with(&self.base_oid) {
329 self.done = true;
330 return Poll::Ready(None);
331 }
332
333 if let Err(e) = self.oid_tracker.check(&vb.oid) {
335 self.done = true;
336 return Poll::Ready(Some(Err(e)));
337 }
338
339 self.current_oid = vb.oid.clone();
341 self.count += 1;
342
343 return Poll::Ready(Some(Ok(vb)));
344 }
345
346 if self.pending.is_none() {
348 let client = self.client.clone();
349 let oid = self.current_oid.clone();
350 let max_rep = self.max_repetitions;
351
352 let fut = Box::pin(async move { client.get_bulk(&[oid], 0, max_rep).await });
353 self.pending = Some(fut);
354 }
355
356 let pending = self.pending.as_mut().unwrap();
358 match pending.as_mut().poll(cx) {
359 Poll::Pending => return Poll::Pending,
360 Poll::Ready(result) => {
361 self.pending = None;
362
363 match result {
364 Ok(varbinds) => {
365 if varbinds.is_empty() {
366 self.done = true;
367 return Poll::Ready(None);
368 }
369
370 self.buffer = varbinds;
371 self.buffer_idx = 0;
372 }
374 Err(e) => {
375 self.done = true;
376 return Poll::Ready(Some(Err(e)));
377 }
378 }
379 }
380 }
381 }
382 }
383}
384
385pub enum WalkStream<T: Transport> {
397 GetNext(Walk<T>),
399 GetBulk(BulkWalk<T>),
401}
402
403impl<T: Transport> WalkStream<T> {
404 pub(crate) fn new(
406 client: Client<T>,
407 oid: Oid,
408 version: Version,
409 walk_mode: WalkMode,
410 ordering: OidOrdering,
411 max_results: Option<usize>,
412 max_repetitions: i32,
413 ) -> Result<Self> {
414 let use_bulk = match walk_mode {
415 WalkMode::Auto => version != Version::V1,
416 WalkMode::GetNext => false,
417 WalkMode::GetBulk => {
418 if version == Version::V1 {
419 return Err(Error::GetBulkNotSupportedInV1);
420 }
421 true
422 }
423 };
424
425 Ok(if use_bulk {
426 WalkStream::GetBulk(BulkWalk::new(
427 client,
428 oid,
429 max_repetitions,
430 ordering,
431 max_results,
432 ))
433 } else {
434 WalkStream::GetNext(Walk::new(client, oid, ordering, max_results))
435 })
436 }
437}
438
439impl<T: Transport + 'static> WalkStream<T> {
440 pub async fn next(&mut self) -> Option<Result<VarBind>> {
442 std::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
443 }
444
445 pub async fn collect(mut self) -> Result<Vec<VarBind>> {
447 let mut results = Vec::new();
448 while let Some(result) = self.next().await {
449 results.push(result?);
450 }
451 Ok(results)
452 }
453}
454
455impl<T: Transport + 'static> Stream for WalkStream<T> {
456 type Item = Result<VarBind>;
457
458 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
459 match self.get_mut() {
461 WalkStream::GetNext(walk) => Pin::new(walk).poll_next(cx),
462 WalkStream::GetBulk(bulk_walk) => Pin::new(bulk_walk).poll_next(cx),
463 }
464 }
465}