async_snmp/agent/
mod.rs

1//! SNMP Agent (RFC 3413).
2//!
3//! This module provides SNMP agent functionality for responding to
4//! GET, GETNEXT, GETBULK, and SET requests.
5//!
6//! # Features
7//!
8//! - **Async handlers**: All handler methods are async for database queries, network calls, etc.
9//! - **Atomic SET**: Two-phase commit protocol (test/commit/undo) per RFC 3416
10//! - **VACM support**: Optional View-based Access Control Model (RFC 3415)
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use async_snmp::agent::Agent;
16//! use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
17//! use async_snmp::{Oid, Value, VarBind, oid};
18//! use std::sync::Arc;
19//!
20//! // Define a simple handler for the system MIB subtree
21//! struct SystemMibHandler;
22//!
23//! impl MibHandler for SystemMibHandler {
24//!     fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
25//!         Box::pin(async move {
26//!             // sysDescr.0
27//!             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
28//!                 return GetResult::Value(Value::OctetString("My SNMP Agent".into()));
29//!             }
30//!             // sysObjectID.0
31//!             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 2, 0) {
32//!                 return GetResult::Value(Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999)));
33//!             }
34//!             GetResult::NoSuchObject
35//!         })
36//!     }
37//!
38//!     fn get_next<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetNextResult> {
39//!         Box::pin(async move {
40//!             // Return the lexicographically next OID after the given one
41//!             let sys_descr = oid!(1, 3, 6, 1, 2, 1, 1, 1, 0);
42//!             let sys_object_id = oid!(1, 3, 6, 1, 2, 1, 1, 2, 0);
43//!
44//!             if oid < &sys_descr {
45//!                 return GetNextResult::Value(VarBind::new(sys_descr, Value::OctetString("My SNMP Agent".into())));
46//!             }
47//!             if oid < &sys_object_id {
48//!                 return GetNextResult::Value(VarBind::new(sys_object_id, Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999))));
49//!             }
50//!             GetNextResult::EndOfMibView
51//!         })
52//!     }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Box<async_snmp::Error>> {
57//!     let agent = Agent::builder()
58//!         .bind("0.0.0.0:161")
59//!         .community(b"public")
60//!         .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemMibHandler))
61//!         .build()
62//!         .await?;
63//!
64//!     agent.run().await
65//! }
66//! ```
67
68mod request;
69mod response;
70mod set_handler;
71pub mod vacm;
72
73pub use vacm::{SecurityModel, VacmBuilder, VacmConfig, View, ViewCheckResult, ViewSubtree};
74
75use std::collections::HashMap;
76use std::net::SocketAddr;
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU32, Ordering};
79use std::time::Instant;
80
81use bytes::Bytes;
82use subtle::ConstantTimeEq;
83use tokio::net::UdpSocket;
84use tokio::sync::Semaphore;
85use tokio_util::sync::CancellationToken;
86use tracing::instrument;
87
88use std::io::IoSliceMut;
89
90use quinn_udp::{RecvMeta, Transmit, UdpSockRef, UdpSocketState};
91
92use crate::ber::Decoder;
93use crate::error::internal::DecodeErrorKind;
94use crate::error::{Error, ErrorStatus, Result};
95use crate::handler::{GetNextResult, GetResult, MibHandler, RequestContext};
96use crate::notification::UsmUserConfig;
97use crate::oid::Oid;
98use crate::pdu::{Pdu, PduType};
99use crate::util::bind_udp_socket;
100use crate::v3::SaltCounter;
101use crate::value::Value;
102use crate::varbind::VarBind;
103use crate::version::Version;
104
105/// Default maximum message size for UDP (RFC 3417 recommendation).
106const DEFAULT_MAX_MESSAGE_SIZE: usize = 1472;
107
108/// Overhead for SNMP message encoding (approximate conservative estimate).
109/// This accounts for version, community/USM, PDU headers, etc.
110const RESPONSE_OVERHEAD: usize = 100;
111
112/// Registered handler with its OID prefix.
113pub(crate) struct RegisteredHandler {
114    pub(crate) prefix: Oid,
115    pub(crate) handler: Arc<dyn MibHandler>,
116}
117
118/// Builder for [`Agent`].
119///
120/// Use this builder to configure and construct an SNMP agent. The builder
121/// pattern allows you to chain configuration methods before calling
122/// [`build()`](AgentBuilder::build) to create the agent.
123///
124/// # Minimal Example
125///
126/// ```rust,no_run
127/// use async_snmp::agent::Agent;
128/// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
129/// use async_snmp::{Oid, Value, VarBind, oid};
130/// use std::sync::Arc;
131///
132/// struct MyHandler;
133/// impl MibHandler for MyHandler {
134///     fn get<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetResult> {
135///         Box::pin(async { GetResult::NoSuchObject })
136///     }
137///     fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
138///         Box::pin(async { GetNextResult::EndOfMibView })
139///     }
140/// }
141///
142/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
143/// let agent = Agent::builder()
144///     .bind("0.0.0.0:1161")  // Use non-privileged port
145///     .community(b"public")
146///     .handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(MyHandler))
147///     .build()
148///     .await?;
149/// # Ok(())
150/// # }
151/// ```
152pub struct AgentBuilder {
153    bind_addr: String,
154    communities: Vec<Vec<u8>>,
155    usm_users: HashMap<Bytes, UsmUserConfig>,
156    handlers: Vec<RegisteredHandler>,
157    engine_id: Option<Vec<u8>>,
158    max_message_size: usize,
159    max_concurrent_requests: Option<usize>,
160    recv_buffer_size: Option<usize>,
161    vacm: Option<VacmConfig>,
162    cancel: Option<CancellationToken>,
163}
164
165impl AgentBuilder {
166    /// Create a new builder with default settings.
167    ///
168    /// Defaults:
169    /// - Bind address: `0.0.0.0:161` (UDP)
170    /// - Max message size: 1472 bytes (Ethernet MTU - IP/UDP headers)
171    /// - Max concurrent requests: 1000
172    /// - Receive buffer size: 4MB (requested from kernel)
173    /// - No communities or USM users (all requests rejected)
174    /// - No handlers registered
175    pub fn new() -> Self {
176        Self {
177            bind_addr: "0.0.0.0:161".to_string(),
178            communities: Vec::new(),
179            usm_users: HashMap::new(),
180            handlers: Vec::new(),
181            engine_id: None,
182            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
183            max_concurrent_requests: Some(1000),
184            recv_buffer_size: Some(4 * 1024 * 1024), // 4MB
185            vacm: None,
186            cancel: None,
187        }
188    }
189
190    /// Set the UDP bind address.
191    ///
192    /// Default is `0.0.0.0:161` (standard SNMP agent port). Note that binding
193    /// to UDP port 161 typically requires root/administrator privileges.
194    ///
195    /// # IPv4 Examples
196    ///
197    /// ```rust,no_run
198    /// use async_snmp::agent::Agent;
199    ///
200    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
201    /// // Bind to all IPv4 interfaces on standard port (requires privileges)
202    /// let agent = Agent::builder().bind("0.0.0.0:161").community(b"public").build().await?;
203    ///
204    /// // Bind to localhost only on non-privileged port
205    /// let agent = Agent::builder().bind("127.0.0.1:1161").community(b"public").build().await?;
206    ///
207    /// // Bind to specific interface
208    /// let agent = Agent::builder().bind("192.168.1.100:161").community(b"public").build().await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    ///
213    /// # IPv6 / Dual-Stack Examples
214    ///
215    /// ```rust,no_run
216    /// use async_snmp::agent::Agent;
217    ///
218    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
219    /// // Bind to all interfaces via dual-stack (handles both IPv4 and IPv6)
220    /// let agent = Agent::builder().bind("[::]:161").community(b"public").build().await?;
221    ///
222    /// // Bind to IPv6 localhost only
223    /// let agent = Agent::builder().bind("[::1]:1161").community(b"public").build().await?;
224    /// # Ok(())
225    /// # }
226    /// ```
227    pub fn bind(mut self, addr: impl Into<String>) -> Self {
228        self.bind_addr = addr.into();
229        self
230    }
231
232    /// Add an accepted community string for v1/v2c requests.
233    ///
234    /// Multiple communities can be added. If none are added,
235    /// all v1/v2c requests are rejected.
236    ///
237    /// # Example
238    ///
239    /// ```rust,no_run
240    /// use async_snmp::agent::Agent;
241    ///
242    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
243    /// let agent = Agent::builder()
244    ///     .bind("0.0.0.0:1161")
245    ///     .community(b"public")   // Read-only access
246    ///     .community(b"private")  // Read-write access (with VACM)
247    ///     .build()
248    ///     .await?;
249    /// # Ok(())
250    /// # }
251    /// ```
252    pub fn community(mut self, community: &[u8]) -> Self {
253        self.communities.push(community.to_vec());
254        self
255    }
256
257    /// Add multiple community strings.
258    ///
259    /// # Example
260    ///
261    /// ```rust,no_run
262    /// use async_snmp::agent::Agent;
263    ///
264    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
265    /// let communities = ["public", "private", "monitor"];
266    /// let agent = Agent::builder()
267    ///     .bind("0.0.0.0:1161")
268    ///     .communities(communities)
269    ///     .build()
270    ///     .await?;
271    /// # Ok(())
272    /// # }
273    /// ```
274    pub fn communities<I, C>(mut self, communities: I) -> Self
275    where
276        I: IntoIterator<Item = C>,
277        C: AsRef<[u8]>,
278    {
279        for c in communities {
280            self.communities.push(c.as_ref().to_vec());
281        }
282        self
283    }
284
285    /// Add a USM user for SNMPv3 authentication.
286    ///
287    /// Configure authentication and privacy settings using the closure.
288    /// Multiple users can be added with different security levels.
289    ///
290    /// # Security Levels
291    ///
292    /// - **noAuthNoPriv**: No authentication or encryption
293    /// - **authNoPriv**: Authentication only (HMAC verification)
294    /// - **authPriv**: Authentication and encryption
295    ///
296    /// # Example
297    ///
298    /// ```rust,no_run
299    /// use async_snmp::agent::Agent;
300    /// use async_snmp::{AuthProtocol, PrivProtocol};
301    ///
302    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
303    /// let agent = Agent::builder()
304    ///     .bind("0.0.0.0:1161")
305    ///     // Read-only user with authentication only
306    ///     .usm_user("monitor", |u| {
307    ///         u.auth(AuthProtocol::Sha256, b"monitorpass123")
308    ///     })
309    ///     // Admin user with full encryption
310    ///     .usm_user("admin", |u| {
311    ///         u.auth(AuthProtocol::Sha256, b"adminauth123")
312    ///          .privacy(PrivProtocol::Aes128, b"adminpriv123")
313    ///     })
314    ///     .build()
315    ///     .await?;
316    /// # Ok(())
317    /// # }
318    /// ```
319    pub fn usm_user<F>(mut self, username: impl Into<Bytes>, configure: F) -> Self
320    where
321        F: FnOnce(UsmUserConfig) -> UsmUserConfig,
322    {
323        let username_bytes: Bytes = username.into();
324        let config = configure(UsmUserConfig::new(username_bytes.clone()));
325        self.usm_users.insert(username_bytes, config);
326        self
327    }
328
329    /// Set the engine ID for SNMPv3.
330    ///
331    /// If not set, a default engine ID will be generated based on the
332    /// RFC 3411 format using enterprise number and timestamp.
333    ///
334    /// # Example
335    ///
336    /// ```rust,no_run
337    /// use async_snmp::agent::Agent;
338    ///
339    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
340    /// let agent = Agent::builder()
341    ///     .bind("0.0.0.0:1161")
342    ///     .engine_id(b"\x80\x00\x00\x00\x01MyEngine".to_vec())
343    ///     .community(b"public")
344    ///     .build()
345    ///     .await?;
346    /// # Ok(())
347    /// # }
348    /// ```
349    pub fn engine_id(mut self, engine_id: impl Into<Vec<u8>>) -> Self {
350        self.engine_id = Some(engine_id.into());
351        self
352    }
353
354    /// Set the maximum message size for responses.
355    ///
356    /// Default is 1472 octets (fits Ethernet MTU minus IP/UDP headers).
357    /// GETBULK responses will be truncated to fit within this limit.
358    ///
359    /// For SNMPv3 requests, the agent uses the minimum of this value
360    /// and the msgMaxSize from the request.
361    pub fn max_message_size(mut self, size: usize) -> Self {
362        self.max_message_size = size;
363        self
364    }
365
366    /// Set the maximum number of concurrent requests the agent will process.
367    ///
368    /// Default is 1000. Requests beyond this limit will queue until a slot
369    /// becomes available. Set to `None` for unbounded concurrency.
370    ///
371    /// This controls memory usage under high load while still allowing
372    /// parallel request processing.
373    pub fn max_concurrent_requests(mut self, limit: Option<usize>) -> Self {
374        self.max_concurrent_requests = limit;
375        self
376    }
377
378    /// Set the UDP socket receive buffer size.
379    ///
380    /// Default is 4MB. The kernel may cap this at `net.core.rmem_max`.
381    /// A larger buffer prevents packet loss during request bursts.
382    ///
383    /// Set to `None` to use the kernel default.
384    pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
385        self.recv_buffer_size = size;
386        self
387    }
388
389    /// Register a MIB handler for an OID subtree.
390    ///
391    /// Handlers are matched by longest prefix. When a request comes in,
392    /// the handler with the longest matching prefix is used.
393    ///
394    /// # Example
395    ///
396    /// ```rust,no_run
397    /// use async_snmp::agent::Agent;
398    /// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
399    /// use async_snmp::{Oid, Value, VarBind, oid};
400    /// use std::sync::Arc;
401    ///
402    /// struct SystemHandler;
403    /// impl MibHandler for SystemHandler {
404    ///     fn get<'a>(&'a self, _: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
405    ///         Box::pin(async move {
406    ///             if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
407    ///                 GetResult::Value(Value::OctetString("My Agent".into()))
408    ///             } else {
409    ///                 GetResult::NoSuchObject
410    ///             }
411    ///         })
412    ///     }
413    ///     fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
414    ///         Box::pin(async { GetNextResult::EndOfMibView })
415    ///     }
416    /// }
417    ///
418    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
419    /// let agent = Agent::builder()
420    ///     .bind("0.0.0.0:1161")
421    ///     .community(b"public")
422    ///     // Register handler for system MIB subtree
423    ///     .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemHandler))
424    ///     .build()
425    ///     .await?;
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub fn handler(mut self, prefix: Oid, handler: Arc<dyn MibHandler>) -> Self {
430        self.handlers.push(RegisteredHandler { prefix, handler });
431        self
432    }
433
434    /// Configure VACM (View-based Access Control Model) using a builder function.
435    ///
436    /// When VACM is enabled, all requests are checked against the configured
437    /// access control rules. Requests that don't have proper access are rejected
438    /// with `noAccess` error (v2c/v3) or `noSuchName` (v1).
439    ///
440    /// # Example
441    ///
442    /// ```rust,no_run
443    /// use async_snmp::agent::{Agent, SecurityModel, VacmBuilder};
444    /// use async_snmp::message::SecurityLevel;
445    /// use async_snmp::oid;
446    ///
447    /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
448    /// let agent = Agent::builder()
449    ///     .bind("0.0.0.0:161")
450    ///     .community(b"public")
451    ///     .community(b"private")
452    ///     .vacm(|v| v
453    ///         .group("public", SecurityModel::V2c, "readonly_group")
454    ///         .group("private", SecurityModel::V2c, "readwrite_group")
455    ///         .access("readonly_group", |a| a
456    ///             .read_view("full_view"))
457    ///         .access("readwrite_group", |a| a
458    ///             .read_view("full_view")
459    ///             .write_view("write_view"))
460    ///         .view("full_view", |v| v
461    ///             .include(oid!(1, 3, 6, 1)))
462    ///         .view("write_view", |v| v
463    ///             .include(oid!(1, 3, 6, 1, 2, 1, 1))))
464    ///     .build()
465    ///     .await?;
466    /// # Ok(())
467    /// # }
468    /// ```
469    pub fn vacm<F>(mut self, configure: F) -> Self
470    where
471        F: FnOnce(VacmBuilder) -> VacmBuilder,
472    {
473        let builder = VacmBuilder::new();
474        self.vacm = Some(configure(builder).build());
475        self
476    }
477
478    /// Set a cancellation token for graceful shutdown.
479    ///
480    /// If not set, the agent creates its own token accessible via `Agent::cancel()`.
481    pub fn cancel(mut self, token: CancellationToken) -> Self {
482        self.cancel = Some(token);
483        self
484    }
485
486    /// Build the agent.
487    pub async fn build(mut self) -> Result<Agent> {
488        let bind_addr: std::net::SocketAddr = self.bind_addr.parse().map_err(|_| {
489            Error::Config(format!("invalid bind address: {}", self.bind_addr).into())
490        })?;
491
492        let socket = bind_udp_socket(bind_addr, self.recv_buffer_size)
493            .await
494            .map_err(|e| Error::Network {
495                target: bind_addr,
496                source: e,
497            })?;
498
499        let local_addr = socket.local_addr().map_err(|e| Error::Network {
500            target: bind_addr,
501            source: e,
502        })?;
503
504        let socket_state =
505            UdpSocketState::new(UdpSockRef::from(&socket)).map_err(|e| Error::Network {
506                target: bind_addr,
507                source: e,
508            })?;
509
510        // Generate default engine ID if not provided
511        let engine_id = self.engine_id.unwrap_or_else(|| {
512            // RFC 3411 format: enterprise number + format + local identifier
513            // Use a simple format: 0x80 (local) + timestamp + random
514            let mut id = vec![0x80, 0x00, 0x00, 0x00, 0x01]; // Enterprise format indicator
515            let timestamp = std::time::SystemTime::now()
516                .duration_since(std::time::UNIX_EPOCH)
517                .unwrap_or_default()
518                .as_secs();
519            id.extend_from_slice(&timestamp.to_be_bytes());
520            id
521        });
522
523        // Sort handlers by prefix length (longest first) for matching
524        self.handlers
525            .sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
526
527        let cancel = self.cancel.unwrap_or_default();
528
529        // Create concurrency limiter if configured
530        let concurrency_limit = self
531            .max_concurrent_requests
532            .map(|n| Arc::new(Semaphore::new(n)));
533
534        Ok(Agent {
535            inner: Arc::new(AgentInner {
536                socket: Arc::new(socket),
537                socket_state,
538                local_addr,
539                communities: self.communities,
540                usm_users: self.usm_users,
541                handlers: self.handlers,
542                engine_id,
543                engine_boots: AtomicU32::new(1),
544                engine_time: AtomicU32::new(0),
545                engine_start: Instant::now(),
546                salt_counter: SaltCounter::new(),
547                max_message_size: self.max_message_size,
548                concurrency_limit,
549                vacm: self.vacm,
550                snmp_invalid_msgs: AtomicU32::new(0),
551                snmp_unknown_security_models: AtomicU32::new(0),
552                snmp_silent_drops: AtomicU32::new(0),
553                cancel,
554            }),
555        })
556    }
557}
558
559impl Default for AgentBuilder {
560    fn default() -> Self {
561        Self::new()
562    }
563}
564
565/// Inner state shared across agent clones.
566pub(crate) struct AgentInner {
567    pub(crate) socket: Arc<UdpSocket>,
568    pub(crate) socket_state: UdpSocketState,
569    pub(crate) local_addr: SocketAddr,
570    pub(crate) communities: Vec<Vec<u8>>,
571    pub(crate) usm_users: HashMap<Bytes, UsmUserConfig>,
572    pub(crate) handlers: Vec<RegisteredHandler>,
573    pub(crate) engine_id: Vec<u8>,
574    pub(crate) engine_boots: AtomicU32,
575    pub(crate) engine_time: AtomicU32,
576    pub(crate) engine_start: Instant,
577    pub(crate) salt_counter: SaltCounter,
578    pub(crate) max_message_size: usize,
579    pub(crate) concurrency_limit: Option<Arc<Semaphore>>,
580    pub(crate) vacm: Option<VacmConfig>,
581    // RFC 3412 statistics counters
582    /// snmpInvalidMsgs (1.3.6.1.6.3.11.2.1.2) - messages with invalid msgFlags
583    /// (e.g., privacy without authentication)
584    pub(crate) snmp_invalid_msgs: AtomicU32,
585    /// snmpUnknownSecurityModels (1.3.6.1.6.3.11.2.1.1) - messages with
586    /// unrecognized security model
587    pub(crate) snmp_unknown_security_models: AtomicU32,
588    /// snmpSilentDrops (1.3.6.1.6.3.11.2.1.3) - confirmed-class PDUs silently
589    /// dropped because even an empty response would exceed max message size
590    pub(crate) snmp_silent_drops: AtomicU32,
591    /// Cancellation token for graceful shutdown.
592    pub(crate) cancel: CancellationToken,
593}
594
595/// SNMP Agent.
596///
597/// Listens for and responds to SNMP requests (GET, GETNEXT, GETBULK, SET).
598///
599/// # Example
600///
601/// ```rust,no_run
602/// use async_snmp::agent::Agent;
603/// use async_snmp::oid;
604///
605/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
606/// let agent = Agent::builder()
607///     .bind("0.0.0.0:161")
608///     .community(b"public")
609///     .build()
610///     .await?;
611///
612/// agent.run().await
613/// # }
614/// ```
615pub struct Agent {
616    pub(crate) inner: Arc<AgentInner>,
617}
618
619impl Agent {
620    /// Create a builder for configuring the agent.
621    pub fn builder() -> AgentBuilder {
622        AgentBuilder::new()
623    }
624
625    /// Get the local address the agent is bound to.
626    pub fn local_addr(&self) -> SocketAddr {
627        self.inner.local_addr
628    }
629
630    /// Get the engine ID.
631    pub fn engine_id(&self) -> &[u8] {
632        &self.inner.engine_id
633    }
634
635    /// Get the cancellation token for this agent.
636    ///
637    /// Call `token.cancel()` to initiate graceful shutdown.
638    pub fn cancel(&self) -> CancellationToken {
639        self.inner.cancel.clone()
640    }
641
642    /// Get the snmpInvalidMsgs counter value.
643    ///
644    /// This counter tracks messages with invalid msgFlags, such as
645    /// privacy-without-authentication (RFC 3412 Section 7.2 Step 5d).
646    ///
647    /// OID: 1.3.6.1.6.3.11.2.1.2
648    pub fn snmp_invalid_msgs(&self) -> u32 {
649        self.inner.snmp_invalid_msgs.load(Ordering::Relaxed)
650    }
651
652    /// Get the snmpUnknownSecurityModels counter value.
653    ///
654    /// This counter tracks messages with unrecognized security models
655    /// (RFC 3412 Section 7.2 Step 2).
656    ///
657    /// OID: 1.3.6.1.6.3.11.2.1.1
658    pub fn snmp_unknown_security_models(&self) -> u32 {
659        self.inner
660            .snmp_unknown_security_models
661            .load(Ordering::Relaxed)
662    }
663
664    /// Get the snmpSilentDrops counter value.
665    ///
666    /// This counter tracks confirmed-class PDUs (GetRequest, GetNextRequest,
667    /// GetBulkRequest, SetRequest, InformRequest) that were silently dropped
668    /// because even an empty Response-PDU would exceed the maximum message
669    /// size constraint (RFC 3412 Section 7.1).
670    ///
671    /// OID: 1.3.6.1.6.3.11.2.1.3
672    pub fn snmp_silent_drops(&self) -> u32 {
673        self.inner.snmp_silent_drops.load(Ordering::Relaxed)
674    }
675
676    /// Run the agent, processing requests concurrently.
677    ///
678    /// Requests are processed in parallel up to the configured
679    /// `max_concurrent_requests` limit (default: 1000). This method runs
680    /// until the cancellation token is triggered.
681    #[instrument(skip(self), err, fields(snmp.local_addr = %self.local_addr()))]
682    pub async fn run(&self) -> Result<()> {
683        let mut buf = vec![0u8; 65535];
684
685        loop {
686            let recv_meta = tokio::select! {
687                result = self.recv_packet(&mut buf) => {
688                    result?
689                }
690                _ = self.inner.cancel.cancelled() => {
691                    tracing::info!(target: "async_snmp::agent", "agent shutdown requested");
692                    return Ok(());
693                }
694            };
695
696            let data = Bytes::copy_from_slice(&buf[..recv_meta.len]);
697            let agent = self.clone();
698
699            let permit = if let Some(ref sem) = self.inner.concurrency_limit {
700                Some(sem.clone().acquire_owned().await.expect("semaphore closed"))
701            } else {
702                None
703            };
704
705            tokio::spawn(async move {
706                agent.update_engine_time();
707
708                match agent.handle_request(data, recv_meta.addr).await {
709                    Ok(Some(response_bytes)) => {
710                        if let Err(e) = agent.send_response(&response_bytes, &recv_meta).await {
711                            tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "failed to send response");
712                        }
713                    }
714                    Ok(None) => {}
715                    Err(e) => {
716                        tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "error handling request");
717                    }
718                }
719
720                drop(permit);
721            });
722        }
723    }
724
725    async fn recv_packet(&self, buf: &mut [u8]) -> Result<RecvMeta> {
726        let mut iov = [IoSliceMut::new(buf)];
727        let mut meta = [RecvMeta::default()];
728
729        loop {
730            self.inner
731                .socket
732                .readable()
733                .await
734                .map_err(|e| Error::Network {
735                    target: self.inner.local_addr,
736                    source: e,
737                })?;
738
739            let result = self.inner.socket.try_io(tokio::io::Interest::READABLE, || {
740                let sref = UdpSockRef::from(&*self.inner.socket);
741                self.inner.socket_state.recv(sref, &mut iov, &mut meta)
742            });
743
744            match result {
745                Ok(n) if n > 0 => return Ok(meta[0]),
746                Ok(_) => continue,
747                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
748                Err(e) => {
749                    return Err(Error::Network {
750                        target: self.inner.local_addr,
751                        source: e,
752                    }
753                    .boxed());
754                }
755            }
756        }
757    }
758
759    async fn send_response(&self, data: &[u8], recv_meta: &RecvMeta) -> std::io::Result<()> {
760        let transmit = Transmit {
761            destination: recv_meta.addr,
762            ecn: None,
763            contents: data,
764            segment_size: None,
765            src_ip: recv_meta.dst_ip,
766        };
767
768        loop {
769            self.inner.socket.writable().await?;
770
771            let result = self.inner.socket.try_io(tokio::io::Interest::WRITABLE, || {
772                let sref = UdpSockRef::from(&*self.inner.socket);
773                self.inner.socket_state.try_send(sref, &transmit)
774            });
775
776            match result {
777                Ok(()) => return Ok(()),
778                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
779                Err(e) => return Err(e),
780            }
781        }
782    }
783
784    /// Process a single request and return the response bytes.
785    ///
786    /// Returns `None` if no response should be sent.
787    async fn handle_request(&self, data: Bytes, source: SocketAddr) -> Result<Option<Bytes>> {
788        // Peek at version
789        let mut decoder = Decoder::with_target(data.clone(), source);
790        let mut seq = decoder.read_sequence()?;
791        let version_num = seq.read_integer()?;
792        let version = Version::from_i32(version_num).ok_or_else(|| {
793            tracing::debug!(target: "async_snmp::agent", { source = %source, kind = %DecodeErrorKind::UnknownVersion(version_num) }, "unknown SNMP version");
794            Error::MalformedResponse { target: source }.boxed()
795        })?;
796        drop(seq);
797        drop(decoder);
798
799        match version {
800            Version::V1 => self.handle_v1(data, source).await,
801            Version::V2c => self.handle_v2c(data, source).await,
802            Version::V3 => self.handle_v3(data, source).await,
803        }
804    }
805
806    /// Update engine time based on elapsed time since start.
807    fn update_engine_time(&self) {
808        let elapsed = self.inner.engine_start.elapsed().as_secs() as u32;
809        self.inner.engine_time.store(elapsed, Ordering::Relaxed);
810    }
811
812    /// Validate community string using constant-time comparison.
813    ///
814    /// Uses constant-time comparison to prevent timing attacks that could
815    /// be used to guess valid community strings character by character.
816    pub(crate) fn validate_community(&self, community: &[u8]) -> bool {
817        if self.inner.communities.is_empty() {
818            // No communities configured = reject all
819            return false;
820        }
821        // Use constant-time comparison for each community string.
822        // We compare against all configured communities regardless of
823        // early matches to maintain constant-time behavior.
824        let mut valid = false;
825        for configured in &self.inner.communities {
826            // ct_eq returns a Choice, which we convert to bool after comparison
827            if configured.len() == community.len()
828                && bool::from(configured.as_slice().ct_eq(community))
829            {
830                valid = true;
831            }
832        }
833        valid
834    }
835
836    /// Dispatch a request to the appropriate handler.
837    async fn dispatch_request(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
838        match pdu.pdu_type {
839            PduType::GetRequest => self.handle_get(ctx, pdu).await,
840            PduType::GetNextRequest => self.handle_get_next(ctx, pdu).await,
841            PduType::GetBulkRequest => self.handle_get_bulk(ctx, pdu).await,
842            PduType::SetRequest => self.handle_set(ctx, pdu).await,
843            PduType::InformRequest => self.handle_inform(pdu),
844            _ => {
845                // Should not happen - filtered earlier
846                Ok(pdu.to_error_response(ErrorStatus::GenErr, 0))
847            }
848        }
849    }
850
851    /// Handle InformRequest PDU.
852    ///
853    /// Per RFC 3416 Section 4.2.7, an InformRequest is a confirmed-class PDU
854    /// that the receiver acknowledges by returning a Response with the same
855    /// request-id and varbind list.
856    fn handle_inform(&self, pdu: &Pdu) -> Result<Pdu> {
857        // Simply acknowledge by returning the same varbinds in a Response
858        Ok(Pdu {
859            pdu_type: PduType::Response,
860            request_id: pdu.request_id,
861            error_status: 0,
862            error_index: 0,
863            varbinds: pdu.varbinds.clone(),
864        })
865    }
866
867    /// Handle GET request.
868    async fn handle_get(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
869        let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
870
871        for (index, vb) in pdu.varbinds.iter().enumerate() {
872            // VACM read access check
873            if let Some(ref vacm) = self.inner.vacm
874                && !vacm.check_access(ctx.read_view.as_ref(), &vb.oid)
875            {
876                // v1: noSuchName, v2c/v3: noAccess or NoSuchObject
877                if ctx.version == Version::V1 {
878                    return Ok(Pdu {
879                        pdu_type: PduType::Response,
880                        request_id: pdu.request_id,
881                        error_status: ErrorStatus::NoSuchName.as_i32(),
882                        error_index: (index + 1) as i32,
883                        varbinds: pdu.varbinds.clone(),
884                    });
885                } else {
886                    // For GET, return NoSuchObject for inaccessible OIDs per RFC 3415
887                    response_varbinds.push(VarBind::new(vb.oid.clone(), Value::NoSuchObject));
888                    continue;
889                }
890            }
891
892            let result = if let Some(handler) = self.find_handler(&vb.oid) {
893                handler.handler.get(ctx, &vb.oid).await
894            } else {
895                GetResult::NoSuchObject
896            };
897
898            let response_value = match result {
899                GetResult::Value(v) => v,
900                GetResult::NoSuchObject => {
901                    // v1 returns noSuchName error, v2c/v3 returns NoSuchObject exception
902                    if ctx.version == Version::V1 {
903                        return Ok(Pdu {
904                            pdu_type: PduType::Response,
905                            request_id: pdu.request_id,
906                            error_status: ErrorStatus::NoSuchName.as_i32(),
907                            error_index: (index + 1) as i32,
908                            varbinds: pdu.varbinds.clone(),
909                        });
910                    } else {
911                        Value::NoSuchObject
912                    }
913                }
914                GetResult::NoSuchInstance => {
915                    // v1 returns noSuchName error, v2c/v3 returns NoSuchInstance exception
916                    if ctx.version == Version::V1 {
917                        return Ok(Pdu {
918                            pdu_type: PduType::Response,
919                            request_id: pdu.request_id,
920                            error_status: ErrorStatus::NoSuchName.as_i32(),
921                            error_index: (index + 1) as i32,
922                            varbinds: pdu.varbinds.clone(),
923                        });
924                    } else {
925                        Value::NoSuchInstance
926                    }
927                }
928            };
929
930            response_varbinds.push(VarBind::new(vb.oid.clone(), response_value));
931        }
932
933        Ok(Pdu {
934            pdu_type: PduType::Response,
935            request_id: pdu.request_id,
936            error_status: 0,
937            error_index: 0,
938            varbinds: response_varbinds,
939        })
940    }
941
942    /// Handle GETNEXT request.
943    async fn handle_get_next(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
944        let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
945
946        for (index, vb) in pdu.varbinds.iter().enumerate() {
947            // Try to find the next OID from any handler
948            let next = self.get_next_oid(ctx, &vb.oid).await;
949
950            // Check VACM access for the returned OID (if VACM enabled)
951            let next = if let Some(ref next_vb) = next {
952                if let Some(ref vacm) = self.inner.vacm {
953                    if vacm.check_access(ctx.read_view.as_ref(), &next_vb.oid) {
954                        next
955                    } else {
956                        // OID not accessible, continue searching
957                        // For simplicity, we just return EndOfMibView here
958                        // A more complete implementation would continue the search
959                        None
960                    }
961                } else {
962                    next
963                }
964            } else {
965                next
966            };
967
968            match next {
969                Some(next_vb) => {
970                    response_varbinds.push(next_vb);
971                }
972                None => {
973                    // v1 returns noSuchName, v2c/v3 returns endOfMibView
974                    if ctx.version == Version::V1 {
975                        return Ok(Pdu {
976                            pdu_type: PduType::Response,
977                            request_id: pdu.request_id,
978                            error_status: ErrorStatus::NoSuchName.as_i32(),
979                            error_index: (index + 1) as i32,
980                            varbinds: pdu.varbinds.clone(),
981                        });
982                    } else {
983                        response_varbinds.push(VarBind::new(vb.oid.clone(), Value::EndOfMibView));
984                    }
985                }
986            }
987        }
988
989        Ok(Pdu {
990            pdu_type: PduType::Response,
991            request_id: pdu.request_id,
992            error_status: 0,
993            error_index: 0,
994            varbinds: response_varbinds,
995        })
996    }
997
998    /// Handle GETBULK request.
999    ///
1000    /// Per RFC 3416 Section 4.2.3, if the response would exceed the message
1001    /// size limit, we return fewer variable bindings rather than all of them.
1002    async fn handle_get_bulk(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
1003        // For GETBULK, error_status is non_repeaters and error_index is max_repetitions
1004        let non_repeaters = pdu.error_status.max(0) as usize;
1005        let max_repetitions = pdu.error_index.max(0) as usize;
1006
1007        let mut response_varbinds = Vec::new();
1008        let mut current_size: usize = RESPONSE_OVERHEAD;
1009        let max_size = self.inner.max_message_size;
1010
1011        // Helper to check if we can add a varbind
1012        let can_add = |vb: &VarBind, current_size: usize| -> bool {
1013            current_size + vb.encoded_size() <= max_size
1014        };
1015
1016        // Handle non-repeaters (first N varbinds get one GETNEXT each)
1017        for vb in pdu.varbinds.iter().take(non_repeaters) {
1018            let next_vb = match self.get_next_oid(ctx, &vb.oid).await {
1019                Some(next_vb) => next_vb,
1020                None => VarBind::new(vb.oid.clone(), Value::EndOfMibView),
1021            };
1022
1023            if !can_add(&next_vb, current_size) {
1024                // Can't fit even non-repeaters, return tooBig if we have nothing
1025                if response_varbinds.is_empty() {
1026                    return Ok(Pdu {
1027                        pdu_type: PduType::Response,
1028                        request_id: pdu.request_id,
1029                        error_status: ErrorStatus::TooBig.as_i32(),
1030                        error_index: 0,
1031                        varbinds: pdu.varbinds.clone(),
1032                    });
1033                }
1034                // Otherwise return what we have
1035                break;
1036            }
1037
1038            current_size += next_vb.encoded_size();
1039            response_varbinds.push(next_vb);
1040        }
1041
1042        // Handle repeaters
1043        if non_repeaters < pdu.varbinds.len() {
1044            let repeaters = &pdu.varbinds[non_repeaters..];
1045            let mut current_oids: Vec<Oid> = repeaters.iter().map(|vb| vb.oid.clone()).collect();
1046            let mut all_done = vec![false; repeaters.len()];
1047
1048            'outer: for _ in 0..max_repetitions {
1049                let mut row_complete = true;
1050                for (i, oid) in current_oids.iter_mut().enumerate() {
1051                    let next_vb = if all_done[i] {
1052                        VarBind::new(oid.clone(), Value::EndOfMibView)
1053                    } else {
1054                        match self.get_next_oid(ctx, oid).await {
1055                            Some(next_vb) => {
1056                                *oid = next_vb.oid.clone();
1057                                row_complete = false;
1058                                next_vb
1059                            }
1060                            None => {
1061                                all_done[i] = true;
1062                                VarBind::new(oid.clone(), Value::EndOfMibView)
1063                            }
1064                        }
1065                    };
1066
1067                    // Check size before adding
1068                    if !can_add(&next_vb, current_size) {
1069                        // Can't fit more, return what we have
1070                        break 'outer;
1071                    }
1072
1073                    current_size += next_vb.encoded_size();
1074                    response_varbinds.push(next_vb);
1075                }
1076
1077                if row_complete {
1078                    break;
1079                }
1080            }
1081        }
1082
1083        Ok(Pdu {
1084            pdu_type: PduType::Response,
1085            request_id: pdu.request_id,
1086            error_status: 0,
1087            error_index: 0,
1088            varbinds: response_varbinds,
1089        })
1090    }
1091
1092    /// Find the handler for a given OID.
1093    pub(crate) fn find_handler(&self, oid: &Oid) -> Option<&RegisteredHandler> {
1094        // Handlers are sorted by prefix length (longest first)
1095        self.inner
1096            .handlers
1097            .iter()
1098            .find(|&handler| handler.handler.handles(&handler.prefix, oid))
1099            .map(|v| v as _)
1100    }
1101
1102    /// Get the next OID from any handler.
1103    async fn get_next_oid(&self, ctx: &RequestContext, oid: &Oid) -> Option<VarBind> {
1104        // Find the first handler that can provide a next OID
1105        let mut best_result: Option<VarBind> = None;
1106
1107        for handler in &self.inner.handlers {
1108            if let GetNextResult::Value(next) = handler.handler.get_next(ctx, oid).await {
1109                // Must be lexicographically greater than the request OID
1110                if next.oid > *oid {
1111                    match &best_result {
1112                        None => best_result = Some(next),
1113                        Some(current) if next.oid < current.oid => best_result = Some(next),
1114                        _ => {}
1115                    }
1116                }
1117            }
1118        }
1119
1120        best_result
1121    }
1122}
1123
1124impl Clone for Agent {
1125    fn clone(&self) -> Self {
1126        Self {
1127            inner: Arc::clone(&self.inner),
1128        }
1129    }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134    use super::*;
1135    use crate::handler::{
1136        BoxFuture, GetNextResult, GetResult, MibHandler, RequestContext, SecurityModel, SetResult,
1137    };
1138    use crate::message::SecurityLevel;
1139    use crate::oid;
1140
1141    struct TestHandler;
1142
1143    impl MibHandler for TestHandler {
1144        fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
1145            Box::pin(async move {
1146                if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0) {
1147                    return GetResult::Value(Value::Integer(42));
1148                }
1149                if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0) {
1150                    return GetResult::Value(Value::OctetString(Bytes::from_static(b"test")));
1151                }
1152                GetResult::NoSuchObject
1153            })
1154        }
1155
1156        fn get_next<'a>(
1157            &'a self,
1158            _ctx: &'a RequestContext,
1159            oid: &'a Oid,
1160        ) -> BoxFuture<'a, GetNextResult> {
1161            Box::pin(async move {
1162                let oid1 = oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0);
1163                let oid2 = oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0);
1164
1165                if oid < &oid1 {
1166                    return GetNextResult::Value(VarBind::new(oid1, Value::Integer(42)));
1167                }
1168                if oid < &oid2 {
1169                    return GetNextResult::Value(VarBind::new(
1170                        oid2,
1171                        Value::OctetString(Bytes::from_static(b"test")),
1172                    ));
1173                }
1174                GetNextResult::EndOfMibView
1175            })
1176        }
1177    }
1178
1179    fn test_ctx() -> RequestContext {
1180        RequestContext {
1181            source: "127.0.0.1:12345".parse().unwrap(),
1182            version: Version::V2c,
1183            security_model: SecurityModel::V2c,
1184            security_name: Bytes::from_static(b"public"),
1185            security_level: SecurityLevel::NoAuthNoPriv,
1186            context_name: Bytes::new(),
1187            request_id: 1,
1188            pdu_type: PduType::GetRequest,
1189            group_name: None,
1190            read_view: None,
1191            write_view: None,
1192        }
1193    }
1194
1195    #[test]
1196    fn test_agent_builder_defaults() {
1197        let builder = AgentBuilder::new();
1198        assert_eq!(builder.bind_addr, "0.0.0.0:161");
1199        assert!(builder.communities.is_empty());
1200        assert!(builder.usm_users.is_empty());
1201        assert!(builder.handlers.is_empty());
1202    }
1203
1204    #[test]
1205    fn test_agent_builder_community() {
1206        let builder = AgentBuilder::new()
1207            .community(b"public")
1208            .community(b"private");
1209        assert_eq!(builder.communities.len(), 2);
1210    }
1211
1212    #[test]
1213    fn test_agent_builder_communities() {
1214        let builder = AgentBuilder::new().communities(["public", "private"]);
1215        assert_eq!(builder.communities.len(), 2);
1216    }
1217
1218    #[test]
1219    fn test_agent_builder_handler() {
1220        let builder =
1221            AgentBuilder::new().handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(TestHandler));
1222        assert_eq!(builder.handlers.len(), 1);
1223    }
1224
1225    #[tokio::test]
1226    async fn test_mib_handler_default_set() {
1227        let handler = TestHandler;
1228        let mut ctx = test_ctx();
1229        ctx.pdu_type = PduType::SetRequest;
1230
1231        let result = handler
1232            .test_set(&ctx, &oid!(1, 3, 6, 1), &Value::Integer(1))
1233            .await;
1234        assert_eq!(result, SetResult::NotWritable);
1235    }
1236
1237    #[test]
1238    fn test_mib_handler_handles() {
1239        let handler = TestHandler;
1240        let prefix = oid!(1, 3, 6, 1, 4, 1, 99999);
1241
1242        // OID within prefix
1243        assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0)));
1244
1245        // OID before prefix (GETNEXT should still try)
1246        assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99998)));
1247
1248        // OID after prefix (not handled)
1249        assert!(!handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 100000)));
1250    }
1251
1252    #[tokio::test]
1253    async fn test_test_handler_get() {
1254        let handler = TestHandler;
1255        let ctx = test_ctx();
1256
1257        // Existing OID
1258        let result = handler
1259            .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1260            .await;
1261        assert!(matches!(result, GetResult::Value(Value::Integer(42))));
1262
1263        // Non-existing OID
1264        let result = handler
1265            .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 99, 0))
1266            .await;
1267        assert!(matches!(result, GetResult::NoSuchObject));
1268    }
1269
1270    #[tokio::test]
1271    async fn test_test_handler_get_next() {
1272        let handler = TestHandler;
1273        let mut ctx = test_ctx();
1274        ctx.pdu_type = PduType::GetNextRequest;
1275
1276        // Before first OID
1277        let next = handler.get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999)).await;
1278        assert!(next.is_value());
1279        if let GetNextResult::Value(vb) = next {
1280            assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0));
1281        }
1282
1283        // Between OIDs
1284        let next = handler
1285            .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1286            .await;
1287        assert!(next.is_value());
1288        if let GetNextResult::Value(vb) = next {
1289            assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0));
1290        }
1291
1292        // After last OID
1293        let next = handler
1294            .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0))
1295            .await;
1296        assert!(next.is_end_of_mib_view());
1297    }
1298}