async_snmp/agent/
mod.rs

1//! SNMP Agent (RFC 3413).
2//!
3//! This module provides SNMP agent functionality for responding to
4//! GET, GETNEXT, GETBULK, and SET requests.
5//!
6//! # Features
7//!
8//! - **Async handlers**: All handler methods are async for database queries, network calls, etc.
9//! - **Atomic SET**: Two-phase commit protocol (test/commit/undo) per RFC 3416
10//! - **VACM support**: Optional View-based Access Control Model (RFC 3415)
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use async_snmp::agent::Agent;
16//! use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
17//! use async_snmp::{Oid, Value, VarBind, oid};
18//! use std::sync::Arc;
19//!
20//! // Define a simple handler for the system MIB subtree
21//! struct SystemMibHandler;
22//!
23//! impl MibHandler for SystemMibHandler {
24//!     fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
25//!         Box::pin(async move {
26//!             // sysDescr.0
27//!             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
28//!                 return GetResult::Value(Value::OctetString("My SNMP Agent".into()));
29//!             }
30//!             // sysObjectID.0
31//!             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 2, 0) {
32//!                 return GetResult::Value(Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999)));
33//!             }
34//!             GetResult::NoSuchObject
35//!         })
36//!     }
37//!
38//!     fn get_next<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetNextResult> {
39//!         Box::pin(async move {
40//!             // Return the lexicographically next OID after the given one
41//!             let sys_descr = oid!(1, 3, 6, 1, 2, 1, 1, 1, 0);
42//!             let sys_object_id = oid!(1, 3, 6, 1, 2, 1, 1, 2, 0);
43//!
44//!             if oid < &sys_descr {
45//!                 return GetNextResult::Value(VarBind::new(sys_descr, Value::OctetString("My SNMP Agent".into())));
46//!             }
47//!             if oid < &sys_object_id {
48//!                 return GetNextResult::Value(VarBind::new(sys_object_id, Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999))));
49//!             }
50//!             GetNextResult::EndOfMibView
51//!         })
52//!     }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), async_snmp::Error> {
57//!     let agent = Agent::builder()
58//!         .bind("0.0.0.0:161")
59//!         .community(b"public")
60//!         .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemMibHandler))
61//!         .build()
62//!         .await?;
63//!
64//!     agent.run().await
65//! }
66//! ```
67
68mod request;
69mod response;
70mod set_handler;
71pub mod vacm;
72
73pub use vacm::{SecurityModel, VacmBuilder, VacmConfig, View, ViewCheckResult, ViewSubtree};
74
75use std::collections::HashMap;
76use std::net::SocketAddr;
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU32, Ordering};
79use std::time::Instant;
80
81use bytes::Bytes;
82use subtle::ConstantTimeEq;
83use tokio::net::UdpSocket;
84use tokio::sync::Semaphore;
85use tokio_util::sync::CancellationToken;
86use tracing::instrument;
87
88use std::io::IoSliceMut;
89
90use quinn_udp::{RecvMeta, Transmit, UdpSockRef, UdpSocketState};
91
92use crate::ber::Decoder;
93use crate::error::{DecodeErrorKind, Error, ErrorStatus, Result};
94use crate::handler::{GetNextResult, GetResult, MibHandler, RequestContext};
95use crate::notification::UsmUserConfig;
96use crate::oid::Oid;
97use crate::pdu::{Pdu, PduType};
98use crate::util::bind_udp_socket;
99use crate::v3::SaltCounter;
100use crate::value::Value;
101use crate::varbind::VarBind;
102use crate::version::Version;
103
104/// Default maximum message size for UDP (RFC 3417 recommendation).
105const DEFAULT_MAX_MESSAGE_SIZE: usize = 1472;
106
107/// Overhead for SNMP message encoding (approximate conservative estimate).
108/// This accounts for version, community/USM, PDU headers, etc.
109const RESPONSE_OVERHEAD: usize = 100;
110
111/// Registered handler with its OID prefix.
112pub(crate) struct RegisteredHandler {
113    pub(crate) prefix: Oid,
114    pub(crate) handler: Arc<dyn MibHandler>,
115}
116
117/// Builder for [`Agent`].
118///
119/// Use this builder to configure and construct an SNMP agent. The builder
120/// pattern allows you to chain configuration methods before calling
121/// [`build()`](AgentBuilder::build) to create the agent.
122///
123/// # Minimal Example
124///
125/// ```rust,no_run
126/// use async_snmp::agent::Agent;
127/// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
128/// use async_snmp::{Oid, Value, VarBind, oid};
129/// use std::sync::Arc;
130///
131/// struct MyHandler;
132/// impl MibHandler for MyHandler {
133///     fn get<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetResult> {
134///         Box::pin(async { GetResult::NoSuchObject })
135///     }
136///     fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
137///         Box::pin(async { GetNextResult::EndOfMibView })
138///     }
139/// }
140///
141/// # async fn example() -> Result<(), async_snmp::Error> {
142/// let agent = Agent::builder()
143///     .bind("0.0.0.0:1161")  // Use non-privileged port
144///     .community(b"public")
145///     .handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(MyHandler))
146///     .build()
147///     .await?;
148/// # Ok(())
149/// # }
150/// ```
151pub struct AgentBuilder {
152    bind_addr: String,
153    communities: Vec<Vec<u8>>,
154    usm_users: HashMap<Bytes, UsmUserConfig>,
155    handlers: Vec<RegisteredHandler>,
156    engine_id: Option<Vec<u8>>,
157    max_message_size: usize,
158    max_concurrent_requests: Option<usize>,
159    recv_buffer_size: Option<usize>,
160    vacm: Option<VacmConfig>,
161    cancel: Option<CancellationToken>,
162}
163
164impl AgentBuilder {
165    /// Create a new builder with default settings.
166    ///
167    /// Defaults:
168    /// - Bind address: `0.0.0.0:161` (UDP)
169    /// - Max message size: 1472 bytes (Ethernet MTU - IP/UDP headers)
170    /// - Max concurrent requests: 1000
171    /// - Receive buffer size: 4MB (requested from kernel)
172    /// - No communities or USM users (all requests rejected)
173    /// - No handlers registered
174    pub fn new() -> Self {
175        Self {
176            bind_addr: "0.0.0.0:161".to_string(),
177            communities: Vec::new(),
178            usm_users: HashMap::new(),
179            handlers: Vec::new(),
180            engine_id: None,
181            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
182            max_concurrent_requests: Some(1000),
183            recv_buffer_size: Some(4 * 1024 * 1024), // 4MB
184            vacm: None,
185            cancel: None,
186        }
187    }
188
189    /// Set the UDP bind address.
190    ///
191    /// Default is `0.0.0.0:161` (standard SNMP agent port). Note that binding
192    /// to UDP port 161 typically requires root/administrator privileges.
193    ///
194    /// # IPv4 Examples
195    ///
196    /// ```rust,no_run
197    /// use async_snmp::agent::Agent;
198    ///
199    /// # async fn example() -> Result<(), async_snmp::Error> {
200    /// // Bind to all IPv4 interfaces on standard port (requires privileges)
201    /// let agent = Agent::builder().bind("0.0.0.0:161").community(b"public").build().await?;
202    ///
203    /// // Bind to localhost only on non-privileged port
204    /// let agent = Agent::builder().bind("127.0.0.1:1161").community(b"public").build().await?;
205    ///
206    /// // Bind to specific interface
207    /// let agent = Agent::builder().bind("192.168.1.100:161").community(b"public").build().await?;
208    /// # Ok(())
209    /// # }
210    /// ```
211    ///
212    /// # IPv6 / Dual-Stack Examples
213    ///
214    /// ```rust,no_run
215    /// use async_snmp::agent::Agent;
216    ///
217    /// # async fn example() -> Result<(), async_snmp::Error> {
218    /// // Bind to all interfaces via dual-stack (handles both IPv4 and IPv6)
219    /// let agent = Agent::builder().bind("[::]:161").community(b"public").build().await?;
220    ///
221    /// // Bind to IPv6 localhost only
222    /// let agent = Agent::builder().bind("[::1]:1161").community(b"public").build().await?;
223    /// # Ok(())
224    /// # }
225    /// ```
226    pub fn bind(mut self, addr: impl Into<String>) -> Self {
227        self.bind_addr = addr.into();
228        self
229    }
230
231    /// Add an accepted community string for v1/v2c requests.
232    ///
233    /// Multiple communities can be added. If none are added,
234    /// all v1/v2c requests are rejected.
235    ///
236    /// # Example
237    ///
238    /// ```rust,no_run
239    /// use async_snmp::agent::Agent;
240    ///
241    /// # async fn example() -> Result<(), async_snmp::Error> {
242    /// let agent = Agent::builder()
243    ///     .bind("0.0.0.0:1161")
244    ///     .community(b"public")   // Read-only access
245    ///     .community(b"private")  // Read-write access (with VACM)
246    ///     .build()
247    ///     .await?;
248    /// # Ok(())
249    /// # }
250    /// ```
251    pub fn community(mut self, community: &[u8]) -> Self {
252        self.communities.push(community.to_vec());
253        self
254    }
255
256    /// Add multiple community strings.
257    ///
258    /// # Example
259    ///
260    /// ```rust,no_run
261    /// use async_snmp::agent::Agent;
262    ///
263    /// # async fn example() -> Result<(), async_snmp::Error> {
264    /// let communities = ["public", "private", "monitor"];
265    /// let agent = Agent::builder()
266    ///     .bind("0.0.0.0:1161")
267    ///     .communities(communities)
268    ///     .build()
269    ///     .await?;
270    /// # Ok(())
271    /// # }
272    /// ```
273    pub fn communities<I, C>(mut self, communities: I) -> Self
274    where
275        I: IntoIterator<Item = C>,
276        C: AsRef<[u8]>,
277    {
278        for c in communities {
279            self.communities.push(c.as_ref().to_vec());
280        }
281        self
282    }
283
284    /// Add a USM user for SNMPv3 authentication.
285    ///
286    /// Configure authentication and privacy settings using the closure.
287    /// Multiple users can be added with different security levels.
288    ///
289    /// # Security Levels
290    ///
291    /// - **noAuthNoPriv**: No authentication or encryption (not recommended)
292    /// - **authNoPriv**: Authentication only (HMAC verification)
293    /// - **authPriv**: Authentication and encryption (most secure)
294    ///
295    /// # Example
296    ///
297    /// ```rust,no_run
298    /// use async_snmp::agent::Agent;
299    /// use async_snmp::{AuthProtocol, PrivProtocol};
300    ///
301    /// # async fn example() -> Result<(), async_snmp::Error> {
302    /// let agent = Agent::builder()
303    ///     .bind("0.0.0.0:1161")
304    ///     // Read-only user with authentication only
305    ///     .usm_user("monitor", |u| {
306    ///         u.auth(AuthProtocol::Sha256, b"monitorpass123")
307    ///     })
308    ///     // Admin user with full encryption
309    ///     .usm_user("admin", |u| {
310    ///         u.auth(AuthProtocol::Sha256, b"adminauth123")
311    ///          .privacy(PrivProtocol::Aes128, b"adminpriv123")
312    ///     })
313    ///     .build()
314    ///     .await?;
315    /// # Ok(())
316    /// # }
317    /// ```
318    pub fn usm_user<F>(mut self, username: impl Into<Bytes>, configure: F) -> Self
319    where
320        F: FnOnce(UsmUserConfig) -> UsmUserConfig,
321    {
322        let username_bytes: Bytes = username.into();
323        let config = configure(UsmUserConfig::new(username_bytes.clone()));
324        self.usm_users.insert(username_bytes, config);
325        self
326    }
327
328    /// Set the engine ID for SNMPv3.
329    ///
330    /// If not set, a default engine ID will be generated based on the
331    /// RFC 3411 format using enterprise number and timestamp.
332    ///
333    /// # Example
334    ///
335    /// ```rust,no_run
336    /// use async_snmp::agent::Agent;
337    ///
338    /// # async fn example() -> Result<(), async_snmp::Error> {
339    /// let agent = Agent::builder()
340    ///     .bind("0.0.0.0:1161")
341    ///     .engine_id(b"\x80\x00\x00\x00\x01MyEngine".to_vec())
342    ///     .community(b"public")
343    ///     .build()
344    ///     .await?;
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub fn engine_id(mut self, engine_id: impl Into<Vec<u8>>) -> Self {
349        self.engine_id = Some(engine_id.into());
350        self
351    }
352
353    /// Set the maximum message size for responses.
354    ///
355    /// Default is 1472 octets (fits Ethernet MTU minus IP/UDP headers).
356    /// GETBULK responses will be truncated to fit within this limit.
357    ///
358    /// For SNMPv3 requests, the agent uses the minimum of this value
359    /// and the msgMaxSize from the request.
360    pub fn max_message_size(mut self, size: usize) -> Self {
361        self.max_message_size = size;
362        self
363    }
364
365    /// Set the maximum number of concurrent requests the agent will process.
366    ///
367    /// Default is 1000. Requests beyond this limit will queue until a slot
368    /// becomes available. Set to `None` for unbounded concurrency.
369    ///
370    /// This controls memory usage under high load while still allowing
371    /// parallel request processing.
372    pub fn max_concurrent_requests(mut self, limit: Option<usize>) -> Self {
373        self.max_concurrent_requests = limit;
374        self
375    }
376
377    /// Set the UDP socket receive buffer size.
378    ///
379    /// Default is 4MB. The kernel may cap this at `net.core.rmem_max`.
380    /// A larger buffer prevents packet loss during request bursts.
381    ///
382    /// Set to `None` to use the kernel default.
383    pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
384        self.recv_buffer_size = size;
385        self
386    }
387
388    /// Register a MIB handler for an OID subtree.
389    ///
390    /// Handlers are matched by longest prefix. When a request comes in,
391    /// the handler with the longest matching prefix is used.
392    ///
393    /// # Example
394    ///
395    /// ```rust,no_run
396    /// use async_snmp::agent::Agent;
397    /// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
398    /// use async_snmp::{Oid, Value, VarBind, oid};
399    /// use std::sync::Arc;
400    ///
401    /// struct SystemHandler;
402    /// impl MibHandler for SystemHandler {
403    ///     fn get<'a>(&'a self, _: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
404    ///         Box::pin(async move {
405    ///             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
406    ///                 GetResult::Value(Value::OctetString("My Agent".into()))
407    ///             } else {
408    ///                 GetResult::NoSuchObject
409    ///             }
410    ///         })
411    ///     }
412    ///     fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
413    ///         Box::pin(async { GetNextResult::EndOfMibView })
414    ///     }
415    /// }
416    ///
417    /// # async fn example() -> Result<(), async_snmp::Error> {
418    /// let agent = Agent::builder()
419    ///     .bind("0.0.0.0:1161")
420    ///     .community(b"public")
421    ///     // Register handler for system MIB subtree
422    ///     .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemHandler))
423    ///     .build()
424    ///     .await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub fn handler(mut self, prefix: Oid, handler: Arc<dyn MibHandler>) -> Self {
429        self.handlers.push(RegisteredHandler { prefix, handler });
430        self
431    }
432
433    /// Configure VACM (View-based Access Control Model) using a builder function.
434    ///
435    /// When VACM is enabled, all requests are checked against the configured
436    /// access control rules. Requests that don't have proper access are rejected
437    /// with `noAccess` error (v2c/v3) or `noSuchName` (v1).
438    ///
439    /// # Example
440    ///
441    /// ```rust,no_run
442    /// use async_snmp::agent::{Agent, SecurityModel, VacmBuilder};
443    /// use async_snmp::message::SecurityLevel;
444    /// use async_snmp::oid;
445    ///
446    /// # async fn example() -> Result<(), async_snmp::Error> {
447    /// let agent = Agent::builder()
448    ///     .bind("0.0.0.0:161")
449    ///     .community(b"public")
450    ///     .community(b"private")
451    ///     .vacm(|v| v
452    ///         .group("public", SecurityModel::V2c, "readonly_group")
453    ///         .group("private", SecurityModel::V2c, "readwrite_group")
454    ///         .access("readonly_group", |a| a
455    ///             .read_view("full_view"))
456    ///         .access("readwrite_group", |a| a
457    ///             .read_view("full_view")
458    ///             .write_view("write_view"))
459    ///         .view("full_view", |v| v
460    ///             .include(oid!(1, 3, 6, 1)))
461    ///         .view("write_view", |v| v
462    ///             .include(oid!(1, 3, 6, 1, 2, 1, 1))))
463    ///     .build()
464    ///     .await?;
465    /// # Ok(())
466    /// # }
467    /// ```
468    pub fn vacm<F>(mut self, configure: F) -> Self
469    where
470        F: FnOnce(VacmBuilder) -> VacmBuilder,
471    {
472        let builder = VacmBuilder::new();
473        self.vacm = Some(configure(builder).build());
474        self
475    }
476
477    /// Set a cancellation token for graceful shutdown.
478    ///
479    /// If not set, the agent creates its own token accessible via `Agent::cancel()`.
480    pub fn cancel(mut self, token: CancellationToken) -> Self {
481        self.cancel = Some(token);
482        self
483    }
484
485    /// Build the agent.
486    pub async fn build(mut self) -> Result<Agent> {
487        let bind_addr: std::net::SocketAddr = self.bind_addr.parse().map_err(|_| Error::Io {
488            target: None,
489            source: std::io::Error::new(
490                std::io::ErrorKind::InvalidInput,
491                format!("invalid bind address: {}", self.bind_addr),
492            ),
493        })?;
494
495        let socket = bind_udp_socket(bind_addr, self.recv_buffer_size)
496            .await
497            .map_err(|e| Error::Io {
498                target: Some(bind_addr),
499                source: e,
500            })?;
501
502        let local_addr = socket.local_addr().map_err(|e| Error::Io {
503            target: Some(bind_addr),
504            source: e,
505        })?;
506
507        let socket_state =
508            UdpSocketState::new(UdpSockRef::from(&socket)).map_err(|e| Error::Io {
509                target: Some(bind_addr),
510                source: e,
511            })?;
512
513        // Generate default engine ID if not provided
514        let engine_id = self.engine_id.unwrap_or_else(|| {
515            // RFC 3411 format: enterprise number + format + local identifier
516            // Use a simple format: 0x80 (local) + timestamp + random
517            let mut id = vec![0x80, 0x00, 0x00, 0x00, 0x01]; // Enterprise format indicator
518            let timestamp = std::time::SystemTime::now()
519                .duration_since(std::time::UNIX_EPOCH)
520                .unwrap_or_default()
521                .as_secs();
522            id.extend_from_slice(&timestamp.to_be_bytes());
523            id
524        });
525
526        // Sort handlers by prefix length (longest first) for matching
527        self.handlers
528            .sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
529
530        let cancel = self.cancel.unwrap_or_default();
531
532        // Create concurrency limiter if configured
533        let concurrency_limit = self
534            .max_concurrent_requests
535            .map(|n| Arc::new(Semaphore::new(n)));
536
537        Ok(Agent {
538            inner: Arc::new(AgentInner {
539                socket: Arc::new(socket),
540                socket_state,
541                local_addr,
542                communities: self.communities,
543                usm_users: self.usm_users,
544                handlers: self.handlers,
545                engine_id,
546                engine_boots: AtomicU32::new(1),
547                engine_time: AtomicU32::new(0),
548                engine_start: Instant::now(),
549                salt_counter: SaltCounter::new(),
550                max_message_size: self.max_message_size,
551                concurrency_limit,
552                vacm: self.vacm,
553                snmp_invalid_msgs: AtomicU32::new(0),
554                snmp_unknown_security_models: AtomicU32::new(0),
555                snmp_silent_drops: AtomicU32::new(0),
556                cancel,
557            }),
558        })
559    }
560}
561
562impl Default for AgentBuilder {
563    fn default() -> Self {
564        Self::new()
565    }
566}
567
568/// Inner state shared across agent clones.
569pub(crate) struct AgentInner {
570    pub(crate) socket: Arc<UdpSocket>,
571    pub(crate) socket_state: UdpSocketState,
572    pub(crate) local_addr: SocketAddr,
573    pub(crate) communities: Vec<Vec<u8>>,
574    pub(crate) usm_users: HashMap<Bytes, UsmUserConfig>,
575    pub(crate) handlers: Vec<RegisteredHandler>,
576    pub(crate) engine_id: Vec<u8>,
577    pub(crate) engine_boots: AtomicU32,
578    pub(crate) engine_time: AtomicU32,
579    pub(crate) engine_start: Instant,
580    pub(crate) salt_counter: SaltCounter,
581    pub(crate) max_message_size: usize,
582    pub(crate) concurrency_limit: Option<Arc<Semaphore>>,
583    pub(crate) vacm: Option<VacmConfig>,
584    // RFC 3412 statistics counters
585    /// snmpInvalidMsgs (1.3.6.1.6.3.11.2.1.2) - messages with invalid msgFlags
586    /// (e.g., privacy without authentication)
587    pub(crate) snmp_invalid_msgs: AtomicU32,
588    /// snmpUnknownSecurityModels (1.3.6.1.6.3.11.2.1.1) - messages with
589    /// unrecognized security model
590    pub(crate) snmp_unknown_security_models: AtomicU32,
591    /// snmpSilentDrops (1.3.6.1.6.3.11.2.1.3) - confirmed-class PDUs silently
592    /// dropped because even an empty response would exceed max message size
593    pub(crate) snmp_silent_drops: AtomicU32,
594    /// Cancellation token for graceful shutdown.
595    pub(crate) cancel: CancellationToken,
596}
597
598/// SNMP Agent.
599///
600/// Listens for and responds to SNMP requests (GET, GETNEXT, GETBULK, SET).
601///
602/// # Example
603///
604/// ```rust,no_run
605/// use async_snmp::agent::Agent;
606/// use async_snmp::oid;
607///
608/// # async fn example() -> Result<(), async_snmp::Error> {
609/// let agent = Agent::builder()
610///     .bind("0.0.0.0:161")
611///     .community(b"public")
612///     .build()
613///     .await?;
614///
615/// agent.run().await
616/// # }
617/// ```
618pub struct Agent {
619    pub(crate) inner: Arc<AgentInner>,
620}
621
622impl Agent {
623    /// Create a builder for configuring the agent.
624    pub fn builder() -> AgentBuilder {
625        AgentBuilder::new()
626    }
627
628    /// Get the local address the agent is bound to.
629    pub fn local_addr(&self) -> SocketAddr {
630        self.inner.local_addr
631    }
632
633    /// Get the engine ID.
634    pub fn engine_id(&self) -> &[u8] {
635        &self.inner.engine_id
636    }
637
638    /// Get the cancellation token for this agent.
639    ///
640    /// Call `token.cancel()` to initiate graceful shutdown.
641    pub fn cancel(&self) -> CancellationToken {
642        self.inner.cancel.clone()
643    }
644
645    /// Get the snmpInvalidMsgs counter value.
646    ///
647    /// This counter tracks messages with invalid msgFlags, such as
648    /// privacy-without-authentication (RFC 3412 Section 7.2 Step 5d).
649    ///
650    /// OID: 1.3.6.1.6.3.11.2.1.2
651    pub fn snmp_invalid_msgs(&self) -> u32 {
652        self.inner.snmp_invalid_msgs.load(Ordering::Relaxed)
653    }
654
655    /// Get the snmpUnknownSecurityModels counter value.
656    ///
657    /// This counter tracks messages with unrecognized security models
658    /// (RFC 3412 Section 7.2 Step 2).
659    ///
660    /// OID: 1.3.6.1.6.3.11.2.1.1
661    pub fn snmp_unknown_security_models(&self) -> u32 {
662        self.inner
663            .snmp_unknown_security_models
664            .load(Ordering::Relaxed)
665    }
666
667    /// Get the snmpSilentDrops counter value.
668    ///
669    /// This counter tracks confirmed-class PDUs (GetRequest, GetNextRequest,
670    /// GetBulkRequest, SetRequest, InformRequest) that were silently dropped
671    /// because even an empty Response-PDU would exceed the maximum message
672    /// size constraint (RFC 3412 Section 7.1).
673    ///
674    /// OID: 1.3.6.1.6.3.11.2.1.3
675    pub fn snmp_silent_drops(&self) -> u32 {
676        self.inner.snmp_silent_drops.load(Ordering::Relaxed)
677    }
678
679    /// Run the agent, processing requests concurrently.
680    ///
681    /// Requests are processed in parallel up to the configured
682    /// `max_concurrent_requests` limit (default: 1000). This method runs
683    /// until the cancellation token is triggered.
684    #[instrument(skip(self), err, fields(snmp.local_addr = %self.local_addr()))]
685    pub async fn run(&self) -> Result<()> {
686        let mut buf = vec![0u8; 65535];
687
688        loop {
689            let recv_meta = tokio::select! {
690                result = self.recv_packet(&mut buf) => {
691                    result?
692                }
693                _ = self.inner.cancel.cancelled() => {
694                    tracing::info!("agent shutdown requested");
695                    return Ok(());
696                }
697            };
698
699            let data = Bytes::copy_from_slice(&buf[..recv_meta.len]);
700            let agent = self.clone();
701
702            let permit = if let Some(ref sem) = self.inner.concurrency_limit {
703                Some(sem.clone().acquire_owned().await.expect("semaphore closed"))
704            } else {
705                None
706            };
707
708            tokio::spawn(async move {
709                agent.update_engine_time();
710
711                match agent.handle_request(data, recv_meta.addr).await {
712                    Ok(Some(response_bytes)) => {
713                        if let Err(e) = agent.send_response(&response_bytes, &recv_meta).await {
714                            tracing::warn!(snmp.source = %recv_meta.addr, error = %e, "failed to send response");
715                        }
716                    }
717                    Ok(None) => {}
718                    Err(e) => {
719                        tracing::warn!(snmp.source = %recv_meta.addr, error = %e, "error handling request");
720                    }
721                }
722
723                drop(permit);
724            });
725        }
726    }
727
728    async fn recv_packet(&self, buf: &mut [u8]) -> Result<RecvMeta> {
729        let mut iov = [IoSliceMut::new(buf)];
730        let mut meta = [RecvMeta::default()];
731
732        loop {
733            self.inner.socket.readable().await.map_err(|e| Error::Io {
734                target: Some(self.inner.local_addr),
735                source: e,
736            })?;
737
738            let result = self.inner.socket.try_io(tokio::io::Interest::READABLE, || {
739                let sref = UdpSockRef::from(&*self.inner.socket);
740                self.inner.socket_state.recv(sref, &mut iov, &mut meta)
741            });
742
743            match result {
744                Ok(n) if n > 0 => return Ok(meta[0]),
745                Ok(_) => continue,
746                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
747                Err(e) => {
748                    return Err(Error::Io {
749                        target: Some(self.inner.local_addr),
750                        source: e,
751                    });
752                }
753            }
754        }
755    }
756
757    async fn send_response(&self, data: &[u8], recv_meta: &RecvMeta) -> std::io::Result<()> {
758        let transmit = Transmit {
759            destination: recv_meta.addr,
760            ecn: None,
761            contents: data,
762            segment_size: None,
763            src_ip: recv_meta.dst_ip,
764        };
765
766        loop {
767            self.inner.socket.writable().await?;
768
769            let result = self.inner.socket.try_io(tokio::io::Interest::WRITABLE, || {
770                let sref = UdpSockRef::from(&*self.inner.socket);
771                self.inner.socket_state.try_send(sref, &transmit)
772            });
773
774            match result {
775                Ok(()) => return Ok(()),
776                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
777                Err(e) => return Err(e),
778            }
779        }
780    }
781
782    /// Process a single request and return the response bytes.
783    ///
784    /// Returns `None` if no response should be sent.
785    async fn handle_request(&self, data: Bytes, source: SocketAddr) -> Result<Option<Bytes>> {
786        // Peek at version
787        let mut decoder = Decoder::new(data.clone());
788        let mut seq = decoder.read_sequence()?;
789        let version_num = seq.read_integer()?;
790        let version = Version::from_i32(version_num).ok_or_else(|| {
791            Error::decode(seq.offset(), DecodeErrorKind::UnknownVersion(version_num))
792        })?;
793        drop(seq);
794        drop(decoder);
795
796        match version {
797            Version::V1 => self.handle_v1(data, source).await,
798            Version::V2c => self.handle_v2c(data, source).await,
799            Version::V3 => self.handle_v3(data, source).await,
800        }
801    }
802
803    /// Update engine time based on elapsed time since start.
804    fn update_engine_time(&self) {
805        let elapsed = self.inner.engine_start.elapsed().as_secs() as u32;
806        self.inner.engine_time.store(elapsed, Ordering::Relaxed);
807    }
808
809    /// Validate community string using constant-time comparison.
810    ///
811    /// Uses constant-time comparison to prevent timing attacks that could
812    /// be used to guess valid community strings character by character.
813    pub(crate) fn validate_community(&self, community: &[u8]) -> bool {
814        if self.inner.communities.is_empty() {
815            // No communities configured = reject all
816            return false;
817        }
818        // Use constant-time comparison for each community string.
819        // We compare against all configured communities regardless of
820        // early matches to maintain constant-time behavior.
821        let mut valid = false;
822        for configured in &self.inner.communities {
823            // ct_eq returns a Choice, which we convert to bool after comparison
824            if configured.len() == community.len()
825                && bool::from(configured.as_slice().ct_eq(community))
826            {
827                valid = true;
828            }
829        }
830        valid
831    }
832
833    /// Dispatch a request to the appropriate handler.
834    async fn dispatch_request(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
835        match pdu.pdu_type {
836            PduType::GetRequest => self.handle_get(ctx, pdu).await,
837            PduType::GetNextRequest => self.handle_get_next(ctx, pdu).await,
838            PduType::GetBulkRequest => self.handle_get_bulk(ctx, pdu).await,
839            PduType::SetRequest => self.handle_set(ctx, pdu).await,
840            PduType::InformRequest => self.handle_inform(pdu),
841            _ => {
842                // Should not happen - filtered earlier
843                Ok(pdu.to_error_response(ErrorStatus::GenErr, 0))
844            }
845        }
846    }
847
848    /// Handle InformRequest PDU.
849    ///
850    /// Per RFC 3416 Section 4.2.7, an InformRequest is a confirmed-class PDU
851    /// that the receiver acknowledges by returning a Response with the same
852    /// request-id and varbind list.
853    fn handle_inform(&self, pdu: &Pdu) -> Result<Pdu> {
854        // Simply acknowledge by returning the same varbinds in a Response
855        Ok(Pdu {
856            pdu_type: PduType::Response,
857            request_id: pdu.request_id,
858            error_status: 0,
859            error_index: 0,
860            varbinds: pdu.varbinds.clone(),
861        })
862    }
863
864    /// Handle GET request.
865    async fn handle_get(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
866        let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
867
868        for (index, vb) in pdu.varbinds.iter().enumerate() {
869            // VACM read access check
870            if let Some(ref vacm) = self.inner.vacm
871                && !vacm.check_access(ctx.read_view.as_ref(), &vb.oid)
872            {
873                // v1: noSuchName, v2c/v3: noAccess or NoSuchObject
874                if ctx.version == Version::V1 {
875                    return Ok(Pdu {
876                        pdu_type: PduType::Response,
877                        request_id: pdu.request_id,
878                        error_status: ErrorStatus::NoSuchName.as_i32(),
879                        error_index: (index + 1) as i32,
880                        varbinds: pdu.varbinds.clone(),
881                    });
882                } else {
883                    // For GET, return NoSuchObject for inaccessible OIDs per RFC 3415
884                    response_varbinds.push(VarBind::new(vb.oid.clone(), Value::NoSuchObject));
885                    continue;
886                }
887            }
888
889            let result = if let Some(handler) = self.find_handler(&vb.oid) {
890                handler.handler.get(ctx, &vb.oid).await
891            } else {
892                GetResult::NoSuchObject
893            };
894
895            let response_value = match result {
896                GetResult::Value(v) => v,
897                GetResult::NoSuchObject => {
898                    // v1 returns noSuchName error, v2c/v3 returns NoSuchObject exception
899                    if ctx.version == Version::V1 {
900                        return Ok(Pdu {
901                            pdu_type: PduType::Response,
902                            request_id: pdu.request_id,
903                            error_status: ErrorStatus::NoSuchName.as_i32(),
904                            error_index: (index + 1) as i32,
905                            varbinds: pdu.varbinds.clone(),
906                        });
907                    } else {
908                        Value::NoSuchObject
909                    }
910                }
911                GetResult::NoSuchInstance => {
912                    // v1 returns noSuchName error, v2c/v3 returns NoSuchInstance exception
913                    if ctx.version == Version::V1 {
914                        return Ok(Pdu {
915                            pdu_type: PduType::Response,
916                            request_id: pdu.request_id,
917                            error_status: ErrorStatus::NoSuchName.as_i32(),
918                            error_index: (index + 1) as i32,
919                            varbinds: pdu.varbinds.clone(),
920                        });
921                    } else {
922                        Value::NoSuchInstance
923                    }
924                }
925            };
926
927            response_varbinds.push(VarBind::new(vb.oid.clone(), response_value));
928        }
929
930        Ok(Pdu {
931            pdu_type: PduType::Response,
932            request_id: pdu.request_id,
933            error_status: 0,
934            error_index: 0,
935            varbinds: response_varbinds,
936        })
937    }
938
939    /// Handle GETNEXT request.
940    async fn handle_get_next(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
941        let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
942
943        for (index, vb) in pdu.varbinds.iter().enumerate() {
944            // Try to find the next OID from any handler
945            let next = self.get_next_oid(ctx, &vb.oid).await;
946
947            // Check VACM access for the returned OID (if VACM enabled)
948            let next = if let Some(ref next_vb) = next {
949                if let Some(ref vacm) = self.inner.vacm {
950                    if vacm.check_access(ctx.read_view.as_ref(), &next_vb.oid) {
951                        next
952                    } else {
953                        // OID not accessible, continue searching
954                        // For simplicity, we just return EndOfMibView here
955                        // A more complete implementation would continue the search
956                        None
957                    }
958                } else {
959                    next
960                }
961            } else {
962                next
963            };
964
965            match next {
966                Some(next_vb) => {
967                    response_varbinds.push(next_vb);
968                }
969                None => {
970                    // v1 returns noSuchName, v2c/v3 returns endOfMibView
971                    if ctx.version == Version::V1 {
972                        return Ok(Pdu {
973                            pdu_type: PduType::Response,
974                            request_id: pdu.request_id,
975                            error_status: ErrorStatus::NoSuchName.as_i32(),
976                            error_index: (index + 1) as i32,
977                            varbinds: pdu.varbinds.clone(),
978                        });
979                    } else {
980                        response_varbinds.push(VarBind::new(vb.oid.clone(), Value::EndOfMibView));
981                    }
982                }
983            }
984        }
985
986        Ok(Pdu {
987            pdu_type: PduType::Response,
988            request_id: pdu.request_id,
989            error_status: 0,
990            error_index: 0,
991            varbinds: response_varbinds,
992        })
993    }
994
995    /// Handle GETBULK request.
996    ///
997    /// Per RFC 3416 Section 4.2.3, if the response would exceed the message
998    /// size limit, we return fewer variable bindings rather than all of them.
999    async fn handle_get_bulk(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
1000        // For GETBULK, error_status is non_repeaters and error_index is max_repetitions
1001        let non_repeaters = pdu.error_status.max(0) as usize;
1002        let max_repetitions = pdu.error_index.max(0) as usize;
1003
1004        let mut response_varbinds = Vec::new();
1005        let mut current_size: usize = RESPONSE_OVERHEAD;
1006        let max_size = self.inner.max_message_size;
1007
1008        // Helper to check if we can add a varbind
1009        let can_add = |vb: &VarBind, current_size: usize| -> bool {
1010            current_size + vb.encoded_size() <= max_size
1011        };
1012
1013        // Handle non-repeaters (first N varbinds get one GETNEXT each)
1014        for vb in pdu.varbinds.iter().take(non_repeaters) {
1015            let next_vb = match self.get_next_oid(ctx, &vb.oid).await {
1016                Some(next_vb) => next_vb,
1017                None => VarBind::new(vb.oid.clone(), Value::EndOfMibView),
1018            };
1019
1020            if !can_add(&next_vb, current_size) {
1021                // Can't fit even non-repeaters, return tooBig if we have nothing
1022                if response_varbinds.is_empty() {
1023                    return Ok(Pdu {
1024                        pdu_type: PduType::Response,
1025                        request_id: pdu.request_id,
1026                        error_status: ErrorStatus::TooBig.as_i32(),
1027                        error_index: 0,
1028                        varbinds: pdu.varbinds.clone(),
1029                    });
1030                }
1031                // Otherwise return what we have
1032                break;
1033            }
1034
1035            current_size += next_vb.encoded_size();
1036            response_varbinds.push(next_vb);
1037        }
1038
1039        // Handle repeaters
1040        if non_repeaters < pdu.varbinds.len() {
1041            let repeaters = &pdu.varbinds[non_repeaters..];
1042            let mut current_oids: Vec<Oid> = repeaters.iter().map(|vb| vb.oid.clone()).collect();
1043            let mut all_done = vec![false; repeaters.len()];
1044
1045            'outer: for _ in 0..max_repetitions {
1046                let mut row_complete = true;
1047                for (i, oid) in current_oids.iter_mut().enumerate() {
1048                    let next_vb = if all_done[i] {
1049                        VarBind::new(oid.clone(), Value::EndOfMibView)
1050                    } else {
1051                        match self.get_next_oid(ctx, oid).await {
1052                            Some(next_vb) => {
1053                                *oid = next_vb.oid.clone();
1054                                row_complete = false;
1055                                next_vb
1056                            }
1057                            None => {
1058                                all_done[i] = true;
1059                                VarBind::new(oid.clone(), Value::EndOfMibView)
1060                            }
1061                        }
1062                    };
1063
1064                    // Check size before adding
1065                    if !can_add(&next_vb, current_size) {
1066                        // Can't fit more, return what we have
1067                        break 'outer;
1068                    }
1069
1070                    current_size += next_vb.encoded_size();
1071                    response_varbinds.push(next_vb);
1072                }
1073
1074                if row_complete {
1075                    break;
1076                }
1077            }
1078        }
1079
1080        Ok(Pdu {
1081            pdu_type: PduType::Response,
1082            request_id: pdu.request_id,
1083            error_status: 0,
1084            error_index: 0,
1085            varbinds: response_varbinds,
1086        })
1087    }
1088
1089    /// Find the handler for a given OID.
1090    pub(crate) fn find_handler(&self, oid: &Oid) -> Option<&RegisteredHandler> {
1091        // Handlers are sorted by prefix length (longest first)
1092        self.inner
1093            .handlers
1094            .iter()
1095            .find(|&handler| handler.handler.handles(&handler.prefix, oid))
1096            .map(|v| v as _)
1097    }
1098
1099    /// Get the next OID from any handler.
1100    async fn get_next_oid(&self, ctx: &RequestContext, oid: &Oid) -> Option<VarBind> {
1101        // Find the first handler that can provide a next OID
1102        let mut best_result: Option<VarBind> = None;
1103
1104        for handler in &self.inner.handlers {
1105            if let GetNextResult::Value(next) = handler.handler.get_next(ctx, oid).await {
1106                // Must be lexicographically greater than the request OID
1107                if next.oid > *oid {
1108                    match &best_result {
1109                        None => best_result = Some(next),
1110                        Some(current) if next.oid < current.oid => best_result = Some(next),
1111                        _ => {}
1112                    }
1113                }
1114            }
1115        }
1116
1117        best_result
1118    }
1119}
1120
1121impl Clone for Agent {
1122    fn clone(&self) -> Self {
1123        Self {
1124            inner: Arc::clone(&self.inner),
1125        }
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use crate::handler::{
1133        BoxFuture, GetNextResult, GetResult, MibHandler, RequestContext, SecurityModel, SetResult,
1134    };
1135    use crate::message::SecurityLevel;
1136    use crate::oid;
1137
1138    struct TestHandler;
1139
1140    impl MibHandler for TestHandler {
1141        fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
1142            Box::pin(async move {
1143                if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0) {
1144                    return GetResult::Value(Value::Integer(42));
1145                }
1146                if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0) {
1147                    return GetResult::Value(Value::OctetString(Bytes::from_static(b"test")));
1148                }
1149                GetResult::NoSuchObject
1150            })
1151        }
1152
1153        fn get_next<'a>(
1154            &'a self,
1155            _ctx: &'a RequestContext,
1156            oid: &'a Oid,
1157        ) -> BoxFuture<'a, GetNextResult> {
1158            Box::pin(async move {
1159                let oid1 = oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0);
1160                let oid2 = oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0);
1161
1162                if oid < &oid1 {
1163                    return GetNextResult::Value(VarBind::new(oid1, Value::Integer(42)));
1164                }
1165                if oid < &oid2 {
1166                    return GetNextResult::Value(VarBind::new(
1167                        oid2,
1168                        Value::OctetString(Bytes::from_static(b"test")),
1169                    ));
1170                }
1171                GetNextResult::EndOfMibView
1172            })
1173        }
1174    }
1175
1176    fn test_ctx() -> RequestContext {
1177        RequestContext {
1178            source: "127.0.0.1:12345".parse().unwrap(),
1179            version: Version::V2c,
1180            security_model: SecurityModel::V2c,
1181            security_name: Bytes::from_static(b"public"),
1182            security_level: SecurityLevel::NoAuthNoPriv,
1183            context_name: Bytes::new(),
1184            request_id: 1,
1185            pdu_type: PduType::GetRequest,
1186            group_name: None,
1187            read_view: None,
1188            write_view: None,
1189        }
1190    }
1191
1192    #[test]
1193    fn test_agent_builder_defaults() {
1194        let builder = AgentBuilder::new();
1195        assert_eq!(builder.bind_addr, "0.0.0.0:161");
1196        assert!(builder.communities.is_empty());
1197        assert!(builder.usm_users.is_empty());
1198        assert!(builder.handlers.is_empty());
1199    }
1200
1201    #[test]
1202    fn test_agent_builder_community() {
1203        let builder = AgentBuilder::new()
1204            .community(b"public")
1205            .community(b"private");
1206        assert_eq!(builder.communities.len(), 2);
1207    }
1208
1209    #[test]
1210    fn test_agent_builder_communities() {
1211        let builder = AgentBuilder::new().communities(["public", "private"]);
1212        assert_eq!(builder.communities.len(), 2);
1213    }
1214
1215    #[test]
1216    fn test_agent_builder_handler() {
1217        let builder =
1218            AgentBuilder::new().handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(TestHandler));
1219        assert_eq!(builder.handlers.len(), 1);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_mib_handler_default_set() {
1224        let handler = TestHandler;
1225        let mut ctx = test_ctx();
1226        ctx.pdu_type = PduType::SetRequest;
1227
1228        let result = handler
1229            .test_set(&ctx, &oid!(1, 3, 6, 1), &Value::Integer(1))
1230            .await;
1231        assert_eq!(result, SetResult::NotWritable);
1232    }
1233
1234    #[test]
1235    fn test_mib_handler_handles() {
1236        let handler = TestHandler;
1237        let prefix = oid!(1, 3, 6, 1, 4, 1, 99999);
1238
1239        // OID within prefix
1240        assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0)));
1241
1242        // OID before prefix (GETNEXT should still try)
1243        assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99998)));
1244
1245        // OID after prefix (not handled)
1246        assert!(!handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 100000)));
1247    }
1248
1249    #[tokio::test]
1250    async fn test_test_handler_get() {
1251        let handler = TestHandler;
1252        let ctx = test_ctx();
1253
1254        // Existing OID
1255        let result = handler
1256            .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1257            .await;
1258        assert!(matches!(result, GetResult::Value(Value::Integer(42))));
1259
1260        // Non-existing OID
1261        let result = handler
1262            .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 99, 0))
1263            .await;
1264        assert!(matches!(result, GetResult::NoSuchObject));
1265    }
1266
1267    #[tokio::test]
1268    async fn test_test_handler_get_next() {
1269        let handler = TestHandler;
1270        let mut ctx = test_ctx();
1271        ctx.pdu_type = PduType::GetNextRequest;
1272
1273        // Before first OID
1274        let next = handler.get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999)).await;
1275        assert!(next.is_value());
1276        if let GetNextResult::Value(vb) = next {
1277            assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0));
1278        }
1279
1280        // Between OIDs
1281        let next = handler
1282            .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1283            .await;
1284        assert!(next.is_value());
1285        if let GetNextResult::Value(vb) = next {
1286            assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0));
1287        }
1288
1289        // After last OID
1290        let next = handler
1291            .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0))
1292            .await;
1293        assert!(next.is_end_of_mib_view());
1294    }
1295}