Skip to main content

async_snmp/client/
mod.rs

1//! SNMP client implementation.
2
3mod auth;
4mod builder;
5mod retry;
6mod v3;
7mod walk;
8
9pub use auth::{Auth, CommunityVersion, UsmAuth, UsmBuilder};
10pub use builder::{ClientBuilder, Target};
11pub use retry::{Backoff, Retry, RetryBuilder};
12
13// New unified entry point
14impl Client<UdpHandle> {
15    /// Create a new SNMP client builder.
16    ///
17    /// This is the single entry point for client construction, supporting all
18    /// SNMP versions (v1, v2c, v3) through the [`Auth`] enum.
19    ///
20    /// # Example
21    ///
22    /// ```rust,no_run
23    /// use async_snmp::{Auth, Client, Retry};
24    /// use std::time::Duration;
25    ///
26    /// # async fn example() -> async_snmp::Result<()> {
27    /// // (host, port) tuple - convenient when host and port are separate
28    /// let client = Client::builder(("192.168.1.1", 161), Auth::v2c("public"))
29    ///     .connect().await?;
30    ///
31    /// // Combined address string (port defaults to 161 if omitted)
32    /// let client = Client::builder("switch.local", Auth::v2c("public"))
33    ///     .connect().await?;
34    ///
35    /// // SocketAddr works too
36    /// let addr: std::net::SocketAddr = "192.168.1.1:161".parse().unwrap();
37    /// let client = Client::builder(addr, Auth::v2c("public"))
38    ///     .connect().await?;
39    /// # Ok(())
40    /// # }
41    /// ```
42    pub fn builder(target: impl Into<Target>, auth: impl Into<Auth>) -> ClientBuilder {
43        ClientBuilder::new(target, auth)
44    }
45}
46use crate::error::internal::DecodeErrorKind;
47use crate::error::{Error, ErrorStatus, Result};
48use crate::message::{CommunityMessage, Message};
49use crate::oid::Oid;
50use crate::pdu::{GetBulkPdu, Pdu, TrapV1Pdu};
51use crate::transport::Transport;
52use crate::transport::UdpHandle;
53use crate::v3::{EngineCache, EngineState, SaltCounter};
54use crate::value::Value;
55use crate::varbind::VarBind;
56use crate::version::Version;
57use bytes::Bytes;
58use std::net::SocketAddr;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::RwLock;
62use std::time::{Duration, Instant};
63use tokio::sync::Mutex as AsyncMutex;
64use tracing::{Span, instrument};
65
66pub use crate::notification::{DerivedKeys, UsmConfig};
67pub use walk::{BulkWalk, OidOrdering, Walk, WalkMode, WalkStream};
68
69// ============================================================================
70// Shared helpers
71// ============================================================================
72
73/// Extract an SNMP-level error from a PDU and convert it to an `Error::Snmp`.
74///
75/// Returns `Some(err)` if the PDU carries an SNMP error status, `None` otherwise.
76/// The `error_index` field is 1-based; 0 means the error applies to the whole PDU.
77pub(crate) fn pdu_to_snmp_error(pdu: &Pdu, target: SocketAddr) -> Option<Box<Error>> {
78    if !pdu.is_error() {
79        return None;
80    }
81    let status = pdu.error_status_enum();
82    let oid = (pdu.error_index as usize)
83        .checked_sub(1)
84        .and_then(|idx| pdu.varbinds.get(idx))
85        .map(|vb| vb.oid.clone());
86    Some(
87        Error::Snmp {
88            target,
89            status,
90            index: pdu.error_index.max(0) as u32,
91            oid,
92        }
93        .boxed(),
94    )
95}
96
97// ============================================================================
98// Default configuration constants
99// ============================================================================
100
101/// Default timeout for SNMP requests.
102pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
103
104/// Default maximum OIDs per request.
105///
106/// Requests with more OIDs than this limit are automatically split into
107/// multiple batches.
108pub const DEFAULT_MAX_OIDS_PER_REQUEST: usize = 10;
109
110/// Default max-repetitions for GETBULK operations.
111///
112/// Controls how many values are requested per GETBULK PDU during walks.
113pub const DEFAULT_MAX_REPETITIONS: u32 = 25;
114
115/// SNMP client.
116///
117/// Generic over transport type, with `UdpHandle` as default.
118pub struct Client<T: Transport = UdpHandle> {
119    inner: Arc<ClientInner<T>>,
120}
121
122impl<T: Transport> Clone for Client<T> {
123    fn clone(&self) -> Self {
124        Self {
125            inner: Arc::clone(&self.inner),
126        }
127    }
128}
129
130struct ClientInner<T: Transport> {
131    transport: T,
132    config: ClientConfig,
133    /// Cached engine state (V3)
134    engine_state: RwLock<Option<EngineState>>,
135    /// Derived keys for this engine (V3)
136    derived_keys: RwLock<Option<DerivedKeys>>,
137    /// Salt counter for privacy (V3)
138    salt_counter: SaltCounter,
139    /// Shared engine cache (V3, optional)
140    engine_cache: Option<Arc<EngineCache>>,
141    /// Serializes concurrent discovery attempts so only one runs at a time.
142    discovery_lock: AsyncMutex<()>,
143    /// Local engine start time for computing engine time in V3 traps.
144    local_engine_start: Instant,
145    /// Keys derived against local_engine_id for V3 trap sending.
146    local_derived_keys: RwLock<Option<DerivedKeys>>,
147}
148
149/// Client configuration.
150///
151/// Most users should use [`ClientBuilder`] rather than constructing this directly.
152#[derive(Clone)]
153pub struct ClientConfig {
154    /// SNMP version (default: V2c)
155    pub version: Version,
156    /// Community string for v1/v2c (default: "public")
157    pub community: Bytes,
158    /// Request timeout (default: 5 seconds)
159    pub timeout: Duration,
160    /// Retry configuration (default: 3 retries, 1-second delay)
161    pub retry: Retry,
162    /// Maximum OIDs per request (default: 10)
163    pub max_oids_per_request: usize,
164    /// SNMPv3 security configuration (default: None)
165    pub v3_security: Option<UsmConfig>,
166    /// Walk operation mode (default: Auto)
167    pub walk_mode: WalkMode,
168    /// OID ordering behavior during walk operations (default: Strict)
169    pub oid_ordering: OidOrdering,
170    /// Maximum results from a single walk operation (default: None/unlimited)
171    pub max_walk_results: Option<usize>,
172    /// Max-repetitions for GETBULK operations (default: 25)
173    pub max_repetitions: u32,
174    /// Local engine ID for V3 trap sending (default: None).
175    ///
176    /// Per RFC 3412 Section 6.4, the sender is the authoritative engine for
177    /// trap PDUs. This engine ID is used to localize keys for outbound V3 traps.
178    pub local_engine_id: Option<Bytes>,
179    /// Local engine boots base value for V3 trap sending (default: 1).
180    pub local_engine_boots: u32,
181}
182
183impl Default for ClientConfig {
184    /// Returns configuration for SNMPv2c with community "public".
185    ///
186    /// See field documentation for all default values.
187    fn default() -> Self {
188        Self {
189            version: Version::V2c,
190            community: Bytes::from_static(b"public"),
191            timeout: DEFAULT_TIMEOUT,
192            retry: Retry::default(),
193            max_oids_per_request: DEFAULT_MAX_OIDS_PER_REQUEST,
194            v3_security: None,
195            walk_mode: WalkMode::Auto,
196            oid_ordering: OidOrdering::Strict,
197            max_walk_results: None,
198            max_repetitions: DEFAULT_MAX_REPETITIONS,
199            local_engine_id: None,
200            local_engine_boots: 1,
201        }
202    }
203}
204
205impl<T: Transport> Client<T> {
206    /// Create a new client with the given transport and config.
207    ///
208    /// For most use cases, prefer [`Client::builder()`] which provides a more
209    /// ergonomic API. Use this constructor when you need fine-grained control
210    /// over transport configuration (e.g., TCP connection timeout, keepalive
211    /// settings) or when using a custom [`Transport`] implementation.
212    pub fn new(transport: T, config: ClientConfig) -> Self {
213        Self {
214            inner: Arc::new(ClientInner {
215                transport,
216                config,
217                engine_state: RwLock::new(None),
218                derived_keys: RwLock::new(None),
219                salt_counter: SaltCounter::new(),
220                engine_cache: None,
221                discovery_lock: AsyncMutex::new(()),
222                local_engine_start: Instant::now(),
223                local_derived_keys: RwLock::new(None),
224            }),
225        }
226    }
227
228    /// Create a new V3 client with a shared engine cache.
229    pub fn with_engine_cache(
230        transport: T,
231        config: ClientConfig,
232        engine_cache: Arc<EngineCache>,
233    ) -> Self {
234        Self {
235            inner: Arc::new(ClientInner {
236                transport,
237                config,
238                engine_state: RwLock::new(None),
239                derived_keys: RwLock::new(None),
240                salt_counter: SaltCounter::new(),
241                engine_cache: Some(engine_cache),
242                discovery_lock: AsyncMutex::new(()),
243                local_engine_start: Instant::now(),
244                local_derived_keys: RwLock::new(None),
245            }),
246        }
247    }
248
249    /// Get the peer (target) address.
250    ///
251    /// Returns the remote address that this client sends requests to.
252    /// Named to match [`std::net::TcpStream::peer_addr()`].
253    pub fn peer_addr(&self) -> SocketAddr {
254        self.inner.transport.peer_addr()
255    }
256
257    /// Generate next request ID.
258    ///
259    /// Uses the transport's allocator (backed by a global counter).
260    fn next_request_id(&self) -> i32 {
261        self.inner.transport.alloc_request_id()
262    }
263
264    /// Check if using V3 with authentication/encryption configured.
265    fn is_v3(&self) -> bool {
266        self.inner.config.version == Version::V3 && self.inner.config.v3_security.is_some()
267    }
268
269    /// Send a request and wait for response (internal helper with pre-encoded data).
270    #[instrument(
271        level = "debug",
272        skip(self, data),
273        fields(
274            snmp.target = %self.peer_addr(),
275            snmp.request_id = request_id,
276            snmp.attempt = tracing::field::Empty,
277            snmp.elapsed_ms = tracing::field::Empty,
278        )
279    )]
280    async fn send_and_recv(&self, request_id: i32, data: &[u8]) -> Result<Pdu> {
281        let start = Instant::now();
282        let mut last_error: Option<Box<Error>> = None;
283        let max_attempts = if self.inner.transport.is_reliable() {
284            0
285        } else {
286            self.inner.config.retry.max_attempts
287        };
288
289        for attempt in 0..=max_attempts {
290            Span::current().record("snmp.attempt", attempt);
291            if attempt > 0 {
292                tracing::debug!(target: "async_snmp::client", "retrying request");
293            }
294
295            // Register (or re-register) with fresh deadline before sending
296            self.inner
297                .transport
298                .register_request(request_id, self.inner.config.timeout);
299
300            // Send request
301            tracing::trace!(target: "async_snmp::client", { snmp.bytes = data.len() }, "sending request");
302            self.inner.transport.send(data).await?;
303
304            // Wait for response (deadline was set by register_request)
305            match self.inner.transport.recv(request_id).await {
306                Ok((response_data, _source)) => {
307                    tracing::trace!(target: "async_snmp::client", { snmp.bytes = response_data.len() }, "received response");
308
309                    // Decode response and extract PDU
310                    let response = Message::decode(response_data)?;
311
312                    // Validate response version matches request version
313                    let response_version = response.version();
314                    let expected_version = self.inner.config.version;
315                    if response_version != expected_version {
316                        tracing::warn!(target: "async_snmp::client", { ?expected_version, ?response_version, peer = %self.peer_addr() }, "version mismatch in response");
317                        return Err(Error::MalformedResponse {
318                            target: self.peer_addr(),
319                        }
320                        .boxed());
321                    }
322
323                    let response_pdu = match response.into_pdu() {
324                        Some(p) => p,
325                        None => {
326                            tracing::warn!(target: "async_snmp::client", { peer = %self.peer_addr() }, "received TrapV1 in response to request");
327                            return Err(Error::MalformedResponse {
328                                target: self.peer_addr(),
329                            }
330                            .boxed());
331                        }
332                    };
333
334                    // Validate request ID
335                    if response_pdu.request_id != request_id {
336                        tracing::warn!(target: "async_snmp::client", { expected_request_id = request_id, actual_request_id = response_pdu.request_id, peer = %self.peer_addr() }, "request ID mismatch in response");
337                        return Err(Error::MalformedResponse {
338                            target: self.peer_addr(),
339                        }
340                        .boxed());
341                    }
342
343                    // Check for SNMP error
344                    if let Some(err) = pdu_to_snmp_error(&response_pdu, self.peer_addr()) {
345                        Span::current()
346                            .record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
347                        return Err(err);
348                    }
349
350                    Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
351                    return Ok(response_pdu);
352                }
353                Err(e) if matches!(*e, Error::Timeout { .. }) => {
354                    last_error = Some(e);
355                    // Apply backoff delay before next retry (if not last attempt)
356                    if attempt < max_attempts {
357                        let delay = self.inner.config.retry.compute_delay(attempt);
358                        if !delay.is_zero() {
359                            tracing::debug!(target: "async_snmp::client", { delay_ms = delay.as_millis() as u64 }, "backing off");
360                            tokio::time::sleep(delay).await;
361                        }
362                    }
363                    continue;
364                }
365                Err(e) => {
366                    Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
367                    return Err(e);
368                }
369            }
370        }
371
372        // All retries exhausted
373        let elapsed = start.elapsed();
374        Span::current().record("snmp.elapsed_ms", elapsed.as_millis() as u64);
375        tracing::debug!(target: "async_snmp::client", { request_id, peer = %self.peer_addr(), ?elapsed, retries = max_attempts }, "request timed out");
376        Err(last_error.unwrap_or_else(|| {
377            Error::Timeout {
378                target: self.peer_addr(),
379                elapsed,
380                retries: max_attempts,
381            }
382            .boxed()
383        }))
384    }
385
386    /// Send a standard request (GET, GETNEXT, SET) and wait for response.
387    async fn send_request(&self, pdu: Pdu) -> Result<Pdu> {
388        // Dispatch to V3 handler if configured
389        if self.is_v3() {
390            return self.send_v3_and_recv(pdu).await;
391        }
392
393        tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?pdu.pdu_type, snmp.varbind_count = pdu.varbinds.len() }, "sending {} request", pdu.pdu_type);
394
395        let request_id = pdu.request_id;
396        let message = CommunityMessage::new(
397            self.inner.config.version,
398            self.inner.config.community.clone(),
399            pdu,
400        );
401        let data = message.encode();
402        let response = self.send_and_recv(request_id, &data).await?;
403
404        tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
405
406        Ok(response)
407    }
408
409    /// Send a GETBULK request and wait for response.
410    async fn send_bulk_request(&self, pdu: GetBulkPdu) -> Result<Pdu> {
411        // Dispatch to V3 handler if configured
412        if self.is_v3() {
413            // Convert GetBulkPdu to Pdu for V3 encoding
414            let pdu = Pdu::get_bulk(
415                pdu.request_id,
416                pdu.non_repeaters,
417                pdu.max_repetitions,
418                pdu.varbinds,
419            );
420            return self.send_v3_and_recv(pdu).await;
421        }
422
423        tracing::debug!(target: "async_snmp::client", { snmp.non_repeaters = pdu.non_repeaters, snmp.max_repetitions = pdu.max_repetitions, snmp.varbind_count = pdu.varbinds.len() }, "sending GetBulkRequest");
424
425        let request_id = pdu.request_id;
426        let data = CommunityMessage::encode_bulk(
427            self.inner.config.version,
428            self.inner.config.community.clone(),
429            &pdu,
430        );
431        let response = self.send_and_recv(request_id, &data).await?;
432
433        tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
434
435        Ok(response)
436    }
437
438    /// GET a single OID.
439    #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
440    pub async fn get(&self, oid: &Oid) -> Result<VarBind> {
441        let request_id = self.next_request_id();
442        let pdu = Pdu::get_request(request_id, std::slice::from_ref(oid));
443        let response = self.send_request(pdu).await?;
444
445        response.varbinds.into_iter().next().ok_or_else(|| {
446            tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GET response");
447            Error::MalformedResponse {
448                target: self.peer_addr(),
449            }
450            .boxed()
451        })
452    }
453
454    /// GET multiple OIDs.
455    ///
456    /// If the OID list exceeds `max_oids_per_request`, the request is
457    /// automatically split into multiple batches. Results are returned
458    /// in the same order as the input OIDs.
459    ///
460    /// # Example
461    ///
462    /// ```rust,no_run
463    /// # use async_snmp::{Auth, Client, oid};
464    /// # async fn example() -> async_snmp::Result<()> {
465    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
466    /// let results = client.get_many(&[
467    ///     oid!(1, 3, 6, 1, 2, 1, 1, 1, 0),  // sysDescr
468    ///     oid!(1, 3, 6, 1, 2, 1, 1, 3, 0),  // sysUpTime
469    ///     oid!(1, 3, 6, 1, 2, 1, 1, 5, 0),  // sysName
470    /// ]).await?;
471    /// # Ok(())
472    /// # }
473    /// ```
474    #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
475    pub async fn get_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
476        self.get_or_getnext_many(oids, "GET", Pdu::get_request)
477            .await
478    }
479
480    /// GETNEXT for a single OID.
481    #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
482    pub async fn get_next(&self, oid: &Oid) -> Result<VarBind> {
483        let request_id = self.next_request_id();
484        let pdu = Pdu::get_next_request(request_id, std::slice::from_ref(oid));
485        let response = self.send_request(pdu).await?;
486
487        response.varbinds.into_iter().next().ok_or_else(|| {
488            tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GETNEXT response");
489            Error::MalformedResponse {
490                target: self.peer_addr(),
491            }
492            .boxed()
493        })
494    }
495
496    /// GETNEXT for multiple OIDs.
497    ///
498    /// If the OID list exceeds `max_oids_per_request`, the request is
499    /// automatically split into multiple batches. Results are returned
500    /// in the same order as the input OIDs.
501    ///
502    /// # Example
503    ///
504    /// ```rust,no_run
505    /// # use async_snmp::{Auth, Client, oid};
506    /// # async fn example() -> async_snmp::Result<()> {
507    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
508    /// let results = client.get_next_many(&[
509    ///     oid!(1, 3, 6, 1, 2, 1, 2, 2, 1, 2),  // ifDescr
510    ///     oid!(1, 3, 6, 1, 2, 1, 2, 2, 1, 3),  // ifType
511    /// ]).await?;
512    /// # Ok(())
513    /// # }
514    /// ```
515    #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
516    pub async fn get_next_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
517        self.get_or_getnext_many(oids, "GETNEXT", Pdu::get_next_request)
518            .await
519    }
520
521    /// Shared implementation for GET-many and GETNEXT-many.
522    ///
523    /// `op` is the PDU constructor (`Pdu::get_request` or `Pdu::get_next_request`).
524    /// `op_name` is used only for log messages.
525    async fn get_or_getnext_many(
526        &self,
527        oids: &[Oid],
528        op_name: &'static str,
529        op: fn(i32, &[Oid]) -> Pdu,
530    ) -> Result<Vec<VarBind>> {
531        if oids.is_empty() {
532            return Ok(Vec::new());
533        }
534
535        let max_per_request = self.inner.config.max_oids_per_request;
536        let mut all_results = Vec::with_capacity(oids.len());
537
538        for chunk in oids.chunks(max_per_request) {
539            self.send_batch_with_bisect(chunk, op_name, op, &mut all_results)
540                .await?;
541        }
542
543        Ok(all_results)
544    }
545
546    /// Send a batch of OIDs, automatically bisecting on tooBig errors.
547    ///
548    /// If the agent returns tooBig for a batch with more than one OID, the batch
549    /// is split in half and each half is retried. This repeats recursively until
550    /// batches succeed or a single-OID request fails (which is unrecoverable).
551    fn send_batch_with_bisect<'a>(
552        &'a self,
553        oids: &'a [Oid],
554        op_name: &'static str,
555        op: fn(i32, &[Oid]) -> Pdu,
556        results: &'a mut Vec<VarBind>,
557    ) -> Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
558        Box::pin(async move {
559            let request_id = self.next_request_id();
560            let pdu = op(request_id, oids);
561            match self.send_request(pdu).await {
562                Ok(response) => {
563                    if response.varbinds.len() > oids.len() {
564                        tracing::warn!(target: "async_snmp::client", { peer = %self.peer_addr(), expected = oids.len(), actual = response.varbinds.len(), snmp.op = op_name }, "response has more varbinds than requested");
565                        return Err(Error::MalformedResponse {
566                            target: self.peer_addr(),
567                        }
568                        .boxed());
569                    } else if response.varbinds.len() < oids.len() {
570                        tracing::warn!(target: "async_snmp::client", { peer = %self.peer_addr(), expected = oids.len(), actual = response.varbinds.len(), snmp.op = op_name }, "response has fewer varbinds than requested");
571                    }
572                    results.extend(response.varbinds);
573                    Ok(())
574                }
575                Err(e)
576                    if oids.len() > 1
577                        && matches!(
578                            &*e,
579                            Error::Snmp {
580                                status: ErrorStatus::TooBig,
581                                ..
582                            }
583                        ) =>
584                {
585                    let mid = oids.len() / 2;
586                    tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), snmp.batch_size = oids.len(), snmp.split_at = mid, snmp.op = op_name }, "tooBig response, bisecting batch");
587                    self.send_batch_with_bisect(&oids[..mid], op_name, op, results)
588                        .await?;
589                    self.send_batch_with_bisect(&oids[mid..], op_name, op, results)
590                        .await?;
591                    Ok(())
592                }
593                Err(e) => Err(e),
594            }
595        })
596    }
597
598    /// SET a single OID.
599    #[instrument(skip(self, value), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
600    pub async fn set(&self, oid: &Oid, value: Value) -> Result<VarBind> {
601        let request_id = self.next_request_id();
602        let varbind = VarBind::new(oid.clone(), value);
603        let pdu = Pdu::set_request(request_id, vec![varbind]);
604        let response = self.send_request(pdu).await?;
605
606        response.varbinds.into_iter().next().ok_or_else(|| {
607            tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty SET response");
608            Error::MalformedResponse {
609                target: self.peer_addr(),
610            }
611            .boxed()
612        })
613    }
614
615    /// SET multiple OIDs in a single atomic PDU.
616    ///
617    /// RFC 3416 requires that a SET request be atomic: either all variables
618    /// in the request are set, or none are. To preserve this guarantee,
619    /// `set_many` refuses to split the varbind list across multiple PDUs.
620    ///
621    /// If `varbinds.len()` exceeds `max_oids_per_request`, this method
622    /// returns `Error::Config` rather than silently batching the request.
623    /// Callers that need to set more variables than the per-request limit
624    /// must issue multiple explicit `set_many` calls and handle partial
625    /// failure themselves.
626    ///
627    /// # Example
628    ///
629    /// ```rust,no_run
630    /// # use async_snmp::{Auth, Client, oid, Value};
631    /// # async fn example() -> async_snmp::Result<()> {
632    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("private")).connect().await?;
633    /// let results = client.set_many(&[
634    ///     (oid!(1, 3, 6, 1, 2, 1, 1, 5, 0), Value::from("new-hostname")),
635    ///     (oid!(1, 3, 6, 1, 2, 1, 1, 6, 0), Value::from("new-location")),
636    /// ]).await?;
637    /// # Ok(())
638    /// # }
639    /// ```
640    #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = varbinds.len()))]
641    pub async fn set_many(&self, varbinds: &[(Oid, Value)]) -> Result<Vec<VarBind>> {
642        if varbinds.is_empty() {
643            return Ok(Vec::new());
644        }
645
646        let max_per_request = self.inner.config.max_oids_per_request;
647
648        if varbinds.len() > max_per_request {
649            return Err(Error::Config(
650                format!(
651                    "set_many: {} varbinds exceeds max_oids_per_request ({}); \
652                     SET must be atomic and cannot be split across PDUs",
653                    varbinds.len(),
654                    max_per_request,
655                )
656                .into(),
657            )
658            .boxed());
659        }
660
661        let request_id = self.next_request_id();
662        let vbs: Vec<VarBind> = varbinds
663            .iter()
664            .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
665            .collect();
666        let expected_count = vbs.len();
667        let pdu = Pdu::set_request(request_id, vbs);
668        let response = self.send_request(pdu).await?;
669        if response.varbinds.len() > expected_count {
670            tracing::warn!(target: "async_snmp::client", { peer = %self.peer_addr(), expected = expected_count, actual = response.varbinds.len() }, "SET response has more varbinds than requested");
671            return Err(Error::MalformedResponse {
672                target: self.peer_addr(),
673            }
674            .boxed());
675        } else if response.varbinds.len() < expected_count {
676            tracing::warn!(target: "async_snmp::client", { peer = %self.peer_addr(), expected = expected_count, actual = response.varbinds.len() }, "SET response has fewer varbinds than requested");
677        }
678        Ok(response.varbinds)
679    }
680
681    /// Send a trap (fire-and-forget).
682    ///
683    /// For V1 clients: constructs a TrapV1 PDU. The `trap_oid` is reverse-mapped
684    /// to v1 generic_trap/specific_trap/enterprise fields per RFC 3584 Section 3.2.
685    /// The agent_addr is set from the transport's local IPv4 address, or `[0,0,0,0]`
686    /// if the local address is IPv6. Use [`send_v1_trap`](Self::send_v1_trap) for
687    /// explicit control over v1 fields.
688    ///
689    /// For V2c/V3 clients: constructs a TrapV2 PDU with the mandatory sysUpTime.0
690    /// and snmpTrapOID.0 prefix.
691    ///
692    /// For V3: uses the local engine ID (set via `ClientBuilder::local_engine_id`).
693    ///
694    /// # Arguments
695    ///
696    /// * `trap_oid` - The trap OID (snmpTrapOID.0 value)
697    /// * `uptime` - sysUpTime.0 value in hundredths of seconds
698    /// * `varbinds` - Additional variable bindings (appended after the prefix)
699    #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.trap_oid = %trap_oid))]
700    pub async fn send_trap(
701        &self,
702        trap_oid: &Oid,
703        uptime: u32,
704        varbinds: Vec<VarBind>,
705    ) -> Result<()> {
706        if self.inner.config.version == Version::V1 {
707            // Build a v2-style PDU and convert to v1.
708            // Per RFC 3584 Section 3, use the local IPv4 address as agent_addr.
709            let local_ip = match self.inner.transport.local_addr().ip() {
710                std::net::IpAddr::V4(v4) => v4.octets(),
711                std::net::IpAddr::V6(_) => [0, 0, 0, 0],
712            };
713            // request_id is unused in the v1 wire format, use 0 to avoid
714            // wasting a slot in the request_id sequence.
715            let pdu = Pdu::trap_v2(0, uptime, trap_oid, varbinds);
716            let trap = pdu.to_v1_trap(local_ip).ok_or_else(|| {
717                Error::Config("cannot convert trap to v1 (Counter64 varbind?)".into()).boxed()
718            })?;
719            return self.send_v1_trap(trap).await;
720        }
721
722        let request_id = self.next_request_id();
723        let pdu = Pdu::trap_v2(request_id, uptime, trap_oid, varbinds);
724
725        if self.is_v3() {
726            self.ensure_local_keys_derived()?;
727            let msg_id = self.next_request_id();
728            let data = self.build_v3_trap_message(&pdu, msg_id)?;
729            tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = "TrapV2", snmp.varbind_count = pdu.varbinds.len(), snmp.bytes = data.len() }, "sending V3 trap");
730            self.inner.transport.send(&data).await?;
731        } else {
732            let message = CommunityMessage::new(
733                self.inner.config.version,
734                self.inner.config.community.clone(),
735                pdu,
736            );
737            let data = message.encode();
738            tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = "TrapV2", snmp.bytes = data.len() }, "sending v2c trap");
739            self.inner.transport.send(&data).await?;
740        }
741
742        Ok(())
743    }
744
745    /// Send an SNMPv1 trap with explicit v1 PDU fields.
746    ///
747    /// This is a lower-level method that accepts a pre-built [`TrapV1Pdu`],
748    /// giving full control over enterprise OID, agent_addr, generic_trap,
749    /// specific_trap, and time_stamp fields.
750    ///
751    /// The client must be configured for V1 (`Auth::v1()`). Returns an error
752    /// if the client version is not V1.
753    ///
754    /// # Example
755    ///
756    /// ```rust,no_run
757    /// # use async_snmp::{Auth, Client, TrapV1Pdu, GenericTrap, oid};
758    /// # async fn example() -> async_snmp::Result<()> {
759    /// let client = Client::builder("192.168.1.100:162", Auth::v1("public"))
760    ///     .connect().await?;
761    ///
762    /// let trap = TrapV1Pdu::new(
763    ///     oid!(1, 3, 6, 1, 4, 1, 9999),  // enterprise
764    ///     [192, 168, 1, 1],               // agent address
765    ///     GenericTrap::ColdStart,
766    ///     0,
767    ///     12345,                          // uptime in centiseconds
768    ///     vec![],
769    /// );
770    /// client.send_v1_trap(trap).await?;
771    /// # Ok(())
772    /// # }
773    /// ```
774    #[instrument(skip(self, trap), err, fields(snmp.target = %self.peer_addr(), snmp.generic_trap = %trap.generic_trap))]
775    pub async fn send_v1_trap(&self, trap: TrapV1Pdu) -> Result<()> {
776        if self.inner.config.version != Version::V1 {
777            return Err(Error::Config("send_v1_trap requires a V1 client".into()).boxed());
778        }
779
780        let message = CommunityMessage::v1_trap(self.inner.config.community.clone(), trap);
781        let data = message.encode();
782        tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = "TrapV1", snmp.bytes = data.len() }, "sending v1 trap");
783        self.inner.transport.send(&data).await?;
784
785        Ok(())
786    }
787
788    /// Send a v2c/v3 inform and wait for acknowledgement.
789    ///
790    /// Constructs an InformRequest PDU with the mandatory sysUpTime.0 and
791    /// snmpTrapOID.0 prefix, sends it to the target, and waits for a Response
792    /// PDU. Uses the same retry and timeout logic as other request types.
793    ///
794    /// For V3: uses engine discovery against the receiver (same as GET/SET).
795    /// V1 is not supported and returns an error.
796    ///
797    /// # Arguments
798    ///
799    /// * `trap_oid` - The trap OID (snmpTrapOID.0 value)
800    /// * `uptime` - sysUpTime.0 value in hundredths of seconds
801    /// * `varbinds` - Additional variable bindings (appended after the prefix)
802    #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.trap_oid = %trap_oid))]
803    pub async fn send_inform(
804        &self,
805        trap_oid: &Oid,
806        uptime: u32,
807        varbinds: Vec<VarBind>,
808    ) -> Result<()> {
809        if self.inner.config.version == Version::V1 {
810            return Err(Error::Config("v1 inform sending not supported".into()).boxed());
811        }
812
813        let request_id = self.next_request_id();
814        let pdu = Pdu::inform_request(request_id, uptime, trap_oid, varbinds);
815        let _response = self.send_request(pdu).await?;
816        Ok(())
817    }
818
819    /// GETBULK request (SNMPv2c/v3 only).
820    ///
821    /// Efficiently retrieves multiple variable bindings in a single request.
822    /// GETBULK splits the requested OIDs into two groups:
823    ///
824    /// - **Non-repeaters** (first N OIDs): Each gets a single GETNEXT, returning
825    ///   one value per OID. Use for scalar values like `sysUpTime.0`.
826    /// - **Repeaters** (remaining OIDs): Each gets up to `max_repetitions` GETNEXTs,
827    ///   returning multiple values per OID. Use for walking table columns.
828    ///
829    /// # Arguments
830    ///
831    /// * `oids` - OIDs to retrieve
832    /// * `non_repeaters` - How many OIDs (from the start) are non-repeating
833    /// * `max_repetitions` - Maximum rows to return for each repeating OID
834    ///
835    /// # Example
836    ///
837    /// ```rust,no_run
838    /// # use async_snmp::{Auth, Client, oid};
839    /// # async fn example() -> async_snmp::Result<()> {
840    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
841    /// // Get sysUpTime (non-repeater) plus 10 interface descriptions (repeater)
842    /// let results = client.get_bulk(
843    ///     &[oid!(1, 3, 6, 1, 2, 1, 1, 3, 0), oid!(1, 3, 6, 1, 2, 1, 2, 2, 1, 2)],
844    ///     1,  // first OID is non-repeating
845    ///     10, // get up to 10 values for the second OID
846    /// ).await?;
847    /// // Results: [sysUpTime value, ifDescr.1, ifDescr.2, ..., ifDescr.10]
848    /// # Ok(())
849    /// # }
850    /// ```
851    #[instrument(skip(self, oids), err, fields(
852        snmp.target = %self.peer_addr(),
853        snmp.oid_count = oids.len(),
854        snmp.non_repeaters = non_repeaters,
855        snmp.max_repetitions = max_repetitions
856    ))]
857    pub async fn get_bulk(
858        &self,
859        oids: &[Oid],
860        non_repeaters: i32,
861        max_repetitions: i32,
862    ) -> Result<Vec<VarBind>> {
863        let request_id = self.next_request_id();
864        let pdu = GetBulkPdu::new(request_id, non_repeaters, max_repetitions, oids);
865        let response = self.send_bulk_request(pdu).await?;
866        Ok(response.varbinds)
867    }
868
869    /// Walk an OID subtree.
870    ///
871    /// Auto-selects the optimal walk method based on SNMP version and `WalkMode`:
872    /// - `WalkMode::Auto` (default): Uses GETNEXT for V1, GETBULK for V2c/V3
873    /// - `WalkMode::GetNext`: Always uses GETNEXT
874    /// - `WalkMode::GetBulk`: Always uses GETBULK (fails on V1)
875    ///
876    /// Returns an async stream that yields each variable binding in the subtree.
877    /// The walk terminates when an OID outside the subtree is encountered or
878    /// when `EndOfMibView` is returned.
879    ///
880    /// Uses the client's configured `oid_ordering`, `max_walk_results`, and
881    /// `max_repetitions` (for GETBULK) settings.
882    ///
883    /// # Example
884    ///
885    /// ```rust,no_run
886    /// # use async_snmp::{Auth, Client, oid};
887    /// # async fn example() -> async_snmp::Result<()> {
888    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
889    /// // Auto-selects GETBULK for V2c/V3, GETNEXT for V1
890    /// let results = client.walk(oid!(1, 3, 6, 1, 2, 1, 1))?.collect().await?;
891    /// # Ok(())
892    /// # }
893    /// ```
894    #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
895    pub fn walk(&self, oid: Oid) -> Result<WalkStream<T>>
896    where
897        T: 'static,
898    {
899        let ordering = self.inner.config.oid_ordering;
900        let max_results = self.inner.config.max_walk_results;
901        let walk_mode = self.inner.config.walk_mode;
902        let max_repetitions = self.inner.config.max_repetitions as i32;
903        let version = self.inner.config.version;
904
905        WalkStream::new(
906            self.clone(),
907            oid,
908            version,
909            walk_mode,
910            ordering,
911            max_results,
912            max_repetitions,
913        )
914    }
915
916    /// Walk an OID subtree using GETNEXT.
917    ///
918    /// This method always uses GETNEXT regardless of the client's `WalkMode` configuration.
919    /// For auto-selection based on version and mode, use [`walk()`](Self::walk) instead.
920    ///
921    /// Returns an async stream that yields each variable binding in the subtree.
922    /// The walk terminates when an OID outside the subtree is encountered or
923    /// when `EndOfMibView` is returned.
924    ///
925    /// Uses the client's configured `oid_ordering` and `max_walk_results` settings.
926    ///
927    /// # Example
928    ///
929    /// ```rust,no_run
930    /// # use async_snmp::{Auth, Client, oid};
931    /// # async fn example() -> async_snmp::Result<()> {
932    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
933    /// // Force GETNEXT even for V2c/V3 clients
934    /// let results = client.walk_getnext(oid!(1, 3, 6, 1, 2, 1, 1)).collect().await?;
935    /// # Ok(())
936    /// # }
937    /// ```
938    #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
939    pub fn walk_getnext(&self, oid: Oid) -> Walk<T>
940    where
941        T: 'static,
942    {
943        let ordering = self.inner.config.oid_ordering;
944        let max_results = self.inner.config.max_walk_results;
945        Walk::new(self.clone(), oid, ordering, max_results)
946    }
947
948    /// Walk an OID subtree using GETBULK (more efficient than GETNEXT).
949    ///
950    /// Returns an async stream that yields each variable binding in the subtree.
951    /// Uses GETBULK internally with `non_repeaters=0`, fetching `max_repetitions`
952    /// values per request for efficient table traversal.
953    ///
954    /// Uses the client's configured `oid_ordering` and `max_walk_results` settings.
955    ///
956    /// # Arguments
957    ///
958    /// * `oid` - The base OID of the subtree to walk
959    /// * `max_repetitions` - How many OIDs to fetch per request
960    ///
961    /// # Example
962    ///
963    /// ```rust,no_run
964    /// # use async_snmp::{Auth, Client, oid};
965    /// # async fn example() -> async_snmp::Result<()> {
966    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
967    /// // Walk the interfaces table efficiently
968    /// let walk = client.bulk_walk(oid!(1, 3, 6, 1, 2, 1, 2, 2), 25);
969    /// // Process with futures StreamExt
970    /// # Ok(())
971    /// # }
972    /// ```
973    #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid, snmp.max_repetitions = max_repetitions))]
974    pub fn bulk_walk(&self, oid: Oid, max_repetitions: i32) -> BulkWalk<T>
975    where
976        T: 'static,
977    {
978        let ordering = self.inner.config.oid_ordering;
979        let max_results = self.inner.config.max_walk_results;
980        BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
981    }
982
983    /// Walk an OID subtree using the client's configured `max_repetitions`.
984    ///
985    /// This is a convenience method that uses the client's `max_repetitions` setting
986    /// (default: 25) instead of requiring it as a parameter.
987    ///
988    /// # Example
989    ///
990    /// ```rust,no_run
991    /// # use async_snmp::{Auth, Client, oid};
992    /// # async fn example() -> async_snmp::Result<()> {
993    /// # let client = Client::builder("127.0.0.1:161", Auth::v2c("public")).connect().await?;
994    /// // Walk using configured max_repetitions
995    /// let walk = client.bulk_walk_default(oid!(1, 3, 6, 1, 2, 1, 2, 2));
996    /// // Process with futures StreamExt
997    /// # Ok(())
998    /// # }
999    /// ```
1000    #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
1001    pub fn bulk_walk_default(&self, oid: Oid) -> BulkWalk<T>
1002    where
1003        T: 'static,
1004    {
1005        let ordering = self.inner.config.oid_ordering;
1006        let max_results = self.inner.config.max_walk_results;
1007        let max_repetitions = self.inner.config.max_repetitions as i32;
1008        BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::*;
1015    use crate::message::CommunityMessage;
1016    use crate::oid::Oid;
1017    use crate::pdu::{Pdu, PduType};
1018    use crate::varbind::VarBind;
1019    use crate::version::Version;
1020    use bytes::Bytes;
1021    use std::collections::VecDeque;
1022    use std::net::SocketAddr;
1023    use std::sync::{Arc, Mutex};
1024
1025    // -------------------------------------------------------------------------
1026    // Mock transport that returns a response with a configurable number of
1027    // varbinds, regardless of how many were requested.
1028    // -------------------------------------------------------------------------
1029
1030    #[derive(Clone)]
1031    struct TruncatingTransport {
1032        /// Number of varbinds to include in each response.
1033        response_varbind_count: usize,
1034        /// Captured (request_id) values from sent requests, stored for building
1035        /// responses.
1036        pending: Arc<Mutex<VecDeque<i32>>>,
1037    }
1038
1039    impl TruncatingTransport {
1040        fn new(response_varbind_count: usize) -> Self {
1041            Self {
1042                response_varbind_count,
1043                pending: Arc::new(Mutex::new(VecDeque::new())),
1044            }
1045        }
1046    }
1047
1048    impl Transport for TruncatingTransport {
1049        fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<()>> + Send {
1050            // Decode the sent request to extract the request_id.
1051            let request_id = crate::transport::extract_request_id(data).unwrap_or(1);
1052            {
1053                let mut q = self.pending.lock().unwrap();
1054                q.push_back(request_id);
1055            }
1056            async { Ok(()) }
1057        }
1058
1059        fn recv(
1060            &self,
1061            _request_id: i32,
1062        ) -> impl std::future::Future<Output = Result<(Bytes, SocketAddr)>> + Send {
1063            let request_id = {
1064                let mut q = self.pending.lock().unwrap();
1065                q.pop_front().unwrap_or(1)
1066            };
1067            let n = self.response_varbind_count;
1068            let peer: SocketAddr = "127.0.0.1:161".parse().unwrap();
1069
1070            async move {
1071                // Build a response PDU with n varbinds (NULL values).
1072                let varbinds: Vec<VarBind> = (0..n)
1073                    .map(|i| {
1074                        VarBind::new(
1075                            Oid::from_slice(&[1, 3, 6, 1, i as u32]),
1076                            crate::value::Value::Null,
1077                        )
1078                    })
1079                    .collect();
1080
1081                let pdu = Pdu {
1082                    pdu_type: PduType::Response,
1083                    request_id,
1084                    error_status: 0,
1085                    error_index: 0,
1086                    varbinds,
1087                };
1088
1089                let msg = CommunityMessage::v2c(Bytes::from_static(b"public"), pdu);
1090                let encoded = msg.encode();
1091                Ok((encoded, peer))
1092            }
1093        }
1094
1095        fn peer_addr(&self) -> SocketAddr {
1096            "127.0.0.1:161".parse().unwrap()
1097        }
1098
1099        fn local_addr(&self) -> SocketAddr {
1100            "127.0.0.1:0".parse().unwrap()
1101        }
1102
1103        fn is_reliable(&self) -> bool {
1104            true
1105        }
1106    }
1107
1108    fn make_client(response_varbind_count: usize) -> Client<TruncatingTransport> {
1109        let transport = TruncatingTransport::new(response_varbind_count);
1110        let config = ClientConfig {
1111            version: Version::V2c,
1112            max_oids_per_request: 10,
1113            retry: crate::client::retry::Retry::none(),
1114            ..Default::default()
1115        };
1116        Client::new(transport, config)
1117    }
1118
1119    #[tokio::test]
1120    async fn get_many_warns_on_truncated_response() {
1121        // Request 3 OIDs but the mock returns only 1 varbind - should warn and return what we got.
1122        let client = make_client(1);
1123        let oids = [
1124            Oid::from_slice(&[1, 3, 6, 1, 1]),
1125            Oid::from_slice(&[1, 3, 6, 1, 2]),
1126            Oid::from_slice(&[1, 3, 6, 1, 3]),
1127        ];
1128
1129        let result = client.get_many(&oids).await;
1130        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1131        assert_eq!(result.unwrap().len(), 1);
1132    }
1133
1134    #[tokio::test]
1135    async fn get_many_rejects_inflated_response() {
1136        // Request 3 OIDs but the mock returns 5 varbinds.
1137        let client = make_client(5);
1138        let oids = [
1139            Oid::from_slice(&[1, 3, 6, 1, 1]),
1140            Oid::from_slice(&[1, 3, 6, 1, 2]),
1141            Oid::from_slice(&[1, 3, 6, 1, 3]),
1142        ];
1143
1144        let err = client.get_many(&oids).await.unwrap_err();
1145        assert!(
1146            matches!(*err, Error::MalformedResponse { .. }),
1147            "expected MalformedResponse, got: {err}"
1148        );
1149    }
1150
1151    #[tokio::test]
1152    async fn get_many_accepts_correct_response_count() {
1153        // Request 3 OIDs and the mock returns exactly 3 varbinds.
1154        let client = make_client(3);
1155        let oids = [
1156            Oid::from_slice(&[1, 3, 6, 1, 1]),
1157            Oid::from_slice(&[1, 3, 6, 1, 2]),
1158            Oid::from_slice(&[1, 3, 6, 1, 3]),
1159        ];
1160
1161        let result = client.get_many(&oids).await;
1162        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1163        assert_eq!(result.unwrap().len(), 3);
1164    }
1165
1166    #[tokio::test]
1167    async fn get_next_many_warns_on_truncated_response() {
1168        // Request 3 OIDs but the mock returns only 1 varbind - should warn and return what we got.
1169        let client = make_client(1);
1170        let oids = [
1171            Oid::from_slice(&[1, 3, 6, 1, 1]),
1172            Oid::from_slice(&[1, 3, 6, 1, 2]),
1173            Oid::from_slice(&[1, 3, 6, 1, 3]),
1174        ];
1175
1176        let result = client.get_next_many(&oids).await;
1177        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1178        assert_eq!(result.unwrap().len(), 1);
1179    }
1180
1181    #[tokio::test]
1182    async fn get_next_many_rejects_inflated_response() {
1183        // Request 3 OIDs but the mock returns 5 varbinds.
1184        let client = make_client(5);
1185        let oids = [
1186            Oid::from_slice(&[1, 3, 6, 1, 1]),
1187            Oid::from_slice(&[1, 3, 6, 1, 2]),
1188            Oid::from_slice(&[1, 3, 6, 1, 3]),
1189        ];
1190
1191        let err = client.get_next_many(&oids).await.unwrap_err();
1192        assert!(
1193            matches!(*err, Error::MalformedResponse { .. }),
1194            "expected MalformedResponse, got: {err}"
1195        );
1196    }
1197
1198    #[tokio::test]
1199    async fn get_next_many_accepts_correct_response_count() {
1200        // Request 3 OIDs and the mock returns exactly 3 varbinds.
1201        let client = make_client(3);
1202        let oids = [
1203            Oid::from_slice(&[1, 3, 6, 1, 1]),
1204            Oid::from_slice(&[1, 3, 6, 1, 2]),
1205            Oid::from_slice(&[1, 3, 6, 1, 3]),
1206        ];
1207
1208        let result = client.get_next_many(&oids).await;
1209        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1210        assert_eq!(result.unwrap().len(), 3);
1211    }
1212
1213    #[tokio::test]
1214    async fn set_many_warns_on_truncated_response() {
1215        // Request 3 varbinds but the mock returns only 1 - should warn and return what we got.
1216        let client = make_client(1);
1217        let varbinds = [
1218            (
1219                Oid::from_slice(&[1, 3, 6, 1, 1]),
1220                crate::value::Value::Integer(1),
1221            ),
1222            (
1223                Oid::from_slice(&[1, 3, 6, 1, 2]),
1224                crate::value::Value::Integer(2),
1225            ),
1226            (
1227                Oid::from_slice(&[1, 3, 6, 1, 3]),
1228                crate::value::Value::Integer(3),
1229            ),
1230        ];
1231
1232        let result = client.set_many(&varbinds).await;
1233        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1234        assert_eq!(result.unwrap().len(), 1);
1235    }
1236
1237    #[tokio::test]
1238    async fn set_many_rejects_inflated_response() {
1239        // Request 3 varbinds but the mock returns 5.
1240        let client = make_client(5);
1241        let varbinds = [
1242            (
1243                Oid::from_slice(&[1, 3, 6, 1, 1]),
1244                crate::value::Value::Integer(1),
1245            ),
1246            (
1247                Oid::from_slice(&[1, 3, 6, 1, 2]),
1248                crate::value::Value::Integer(2),
1249            ),
1250            (
1251                Oid::from_slice(&[1, 3, 6, 1, 3]),
1252                crate::value::Value::Integer(3),
1253            ),
1254        ];
1255
1256        let err = client.set_many(&varbinds).await.unwrap_err();
1257        assert!(
1258            matches!(*err, Error::MalformedResponse { .. }),
1259            "expected MalformedResponse, got: {err}"
1260        );
1261    }
1262
1263    #[tokio::test]
1264    async fn set_many_accepts_correct_response_count() {
1265        // Request 3 varbinds and the mock returns exactly 3.
1266        let client = make_client(3);
1267        let varbinds = [
1268            (
1269                Oid::from_slice(&[1, 3, 6, 1, 1]),
1270                crate::value::Value::Integer(1),
1271            ),
1272            (
1273                Oid::from_slice(&[1, 3, 6, 1, 2]),
1274                crate::value::Value::Integer(2),
1275            ),
1276            (
1277                Oid::from_slice(&[1, 3, 6, 1, 3]),
1278                crate::value::Value::Integer(3),
1279            ),
1280        ];
1281
1282        let result = client.set_many(&varbinds).await;
1283        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1284        assert_eq!(result.unwrap().len(), 3);
1285    }
1286
1287    // -------------------------------------------------------------------------
1288    // Mock transport that returns tooBig when request exceeds a varbind threshold.
1289    // -------------------------------------------------------------------------
1290
1291    #[derive(Clone)]
1292    struct TooBigTransport {
1293        /// Max varbinds per request before returning tooBig.
1294        max_varbinds: usize,
1295        pending: Arc<Mutex<VecDeque<(i32, usize)>>>,
1296    }
1297
1298    impl TooBigTransport {
1299        fn new(max_varbinds: usize) -> Self {
1300            Self {
1301                max_varbinds,
1302                pending: Arc::new(Mutex::new(VecDeque::new())),
1303            }
1304        }
1305    }
1306
1307    impl Transport for TooBigTransport {
1308        fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<()>> + Send {
1309            let request_id = crate::transport::extract_request_id(data).unwrap_or(1);
1310            // Decode the message to count varbinds
1311            let msg = CommunityMessage::decode(Bytes::copy_from_slice(data)).unwrap();
1312            let varbind_count = msg.pdu.standard().unwrap().varbinds.len();
1313            {
1314                let mut q = self.pending.lock().unwrap();
1315                q.push_back((request_id, varbind_count));
1316            }
1317            async { Ok(()) }
1318        }
1319
1320        fn recv(
1321            &self,
1322            _request_id: i32,
1323        ) -> impl std::future::Future<Output = Result<(Bytes, SocketAddr)>> + Send {
1324            let (request_id, varbind_count) = {
1325                let mut q = self.pending.lock().unwrap();
1326                q.pop_front().unwrap_or((1, 0))
1327            };
1328            let max = self.max_varbinds;
1329            let peer: SocketAddr = "127.0.0.1:161".parse().unwrap();
1330
1331            async move {
1332                let pdu = if varbind_count > max {
1333                    // Return tooBig with empty varbinds (per RFC 3416)
1334                    Pdu {
1335                        pdu_type: PduType::Response,
1336                        request_id,
1337                        error_status: ErrorStatus::TooBig.as_i32(),
1338                        error_index: 0,
1339                        varbinds: vec![],
1340                    }
1341                } else {
1342                    // Echo back one varbind per requested OID
1343                    let varbinds: Vec<VarBind> = (0..varbind_count)
1344                        .map(|i| {
1345                            VarBind::new(
1346                                Oid::from_slice(&[1, 3, 6, 1, i as u32]),
1347                                crate::value::Value::Integer(i as i32),
1348                            )
1349                        })
1350                        .collect();
1351                    Pdu {
1352                        pdu_type: PduType::Response,
1353                        request_id,
1354                        error_status: 0,
1355                        error_index: 0,
1356                        varbinds,
1357                    }
1358                };
1359
1360                let msg = CommunityMessage::v2c(Bytes::from_static(b"public"), pdu);
1361                Ok((msg.encode(), peer))
1362            }
1363        }
1364
1365        fn peer_addr(&self) -> SocketAddr {
1366            "127.0.0.1:161".parse().unwrap()
1367        }
1368
1369        fn local_addr(&self) -> SocketAddr {
1370            "127.0.0.1:0".parse().unwrap()
1371        }
1372
1373        fn is_reliable(&self) -> bool {
1374            true
1375        }
1376    }
1377
1378    #[tokio::test]
1379    async fn get_many_bisects_on_too_big() {
1380        // Agent can handle at most 3 varbinds per request. We ask for 8.
1381        // With max_oids_per_request=10, the initial batch is all 8 OIDs.
1382        // That triggers tooBig, so it bisects to 4+4, each of which still
1383        // triggers tooBig, then bisects to 2+2+2+2 which all succeed.
1384        let transport = TooBigTransport::new(3);
1385        let config = ClientConfig {
1386            version: Version::V2c,
1387            max_oids_per_request: 10,
1388            retry: crate::client::retry::Retry::none(),
1389            ..Default::default()
1390        };
1391        let client = Client::new(transport, config);
1392
1393        let oids: Vec<Oid> = (0..8u32)
1394            .map(|i| Oid::from_slice(&[1, 3, 6, 1, i]))
1395            .collect();
1396
1397        let result = client.get_many(&oids).await.unwrap();
1398        assert_eq!(result.len(), 8);
1399    }
1400
1401    #[tokio::test]
1402    async fn get_many_single_oid_too_big_is_unrecoverable() {
1403        // Agent returns tooBig even for a single OID - can't bisect further.
1404        let transport = TooBigTransport::new(0);
1405        let config = ClientConfig {
1406            version: Version::V2c,
1407            max_oids_per_request: 10,
1408            retry: crate::client::retry::Retry::none(),
1409            ..Default::default()
1410        };
1411        let client = Client::new(transport, config);
1412
1413        let oids = [Oid::from_slice(&[1, 3, 6, 1, 1])];
1414        let err = client.get_many(&oids).await.unwrap_err();
1415        assert!(
1416            matches!(
1417                &*err,
1418                Error::Snmp {
1419                    status: ErrorStatus::TooBig,
1420                    ..
1421                }
1422            ),
1423            "expected TooBig, got: {err}"
1424        );
1425    }
1426
1427    #[tokio::test]
1428    async fn get_next_many_bisects_on_too_big() {
1429        // Same as get_many test but for GETNEXT.
1430        let transport = TooBigTransport::new(3);
1431        let config = ClientConfig {
1432            version: Version::V2c,
1433            max_oids_per_request: 10,
1434            retry: crate::client::retry::Retry::none(),
1435            ..Default::default()
1436        };
1437        let client = Client::new(transport, config);
1438
1439        let oids: Vec<Oid> = (0..8u32)
1440            .map(|i| Oid::from_slice(&[1, 3, 6, 1, i]))
1441            .collect();
1442
1443        let result = client.get_next_many(&oids).await.unwrap();
1444        assert_eq!(result.len(), 8);
1445    }
1446
1447    // Batched path: get_many with more OIDs than max_per_request.
1448    #[tokio::test]
1449    async fn get_many_batched_warns_on_truncated_response() {
1450        // max_oids_per_request = 10, request 12 OIDs, mock returns 1 per batch.
1451        // Should warn and return 2 varbinds (1 per batch).
1452        let transport = TruncatingTransport::new(1);
1453        let config = ClientConfig {
1454            version: Version::V2c,
1455            max_oids_per_request: 10,
1456            retry: crate::client::retry::Retry::none(),
1457            ..Default::default()
1458        };
1459        let client = Client::new(transport, config);
1460
1461        let oids: Vec<Oid> = (0..12u32)
1462            .map(|i| Oid::from_slice(&[1, 3, 6, 1, i]))
1463            .collect();
1464
1465        let result = client.get_many(&oids).await;
1466        assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
1467        assert_eq!(result.unwrap().len(), 2); // 1 varbind per batch, 2 batches
1468    }
1469
1470    #[tokio::test]
1471    async fn get_many_batched_rejects_inflated_response() {
1472        // max_oids_per_request = 10, request 12 OIDs, mock returns 12 per batch.
1473        let transport = TruncatingTransport::new(12);
1474        let config = ClientConfig {
1475            version: Version::V2c,
1476            max_oids_per_request: 10,
1477            retry: crate::client::retry::Retry::none(),
1478            ..Default::default()
1479        };
1480        let client = Client::new(transport, config);
1481
1482        let oids: Vec<Oid> = (0..12u32)
1483            .map(|i| Oid::from_slice(&[1, 3, 6, 1, i]))
1484            .collect();
1485
1486        let err = client.get_many(&oids).await.unwrap_err();
1487        assert!(
1488            matches!(*err, Error::MalformedResponse { .. }),
1489            "expected MalformedResponse, got: {err}"
1490        );
1491    }
1492}