async_snmp/agent/mod.rs
1//! SNMP Agent (RFC 3413).
2//!
3//! This module provides SNMP agent functionality for responding to
4//! GET, GETNEXT, GETBULK, and SET requests.
5//!
6//! # Features
7//!
8//! - **Async handlers**: All handler methods are async for database queries, network calls, etc.
9//! - **Atomic SET**: Two-phase commit protocol (test/commit/undo) per RFC 3416
10//! - **VACM support**: Optional View-based Access Control Model (RFC 3415)
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use async_snmp::agent::Agent;
16//! use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
17//! use async_snmp::{Oid, Value, VarBind, oid};
18//! use std::sync::Arc;
19//!
20//! // Define a simple handler for the system MIB subtree
21//! struct SystemMibHandler;
22//!
23//! impl MibHandler for SystemMibHandler {
24//! fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
25//! Box::pin(async move {
26//! // sysDescr.0
27//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
28//! return GetResult::Value(Value::OctetString("My SNMP Agent".into()));
29//! }
30//! // sysObjectID.0
31//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 2, 0) {
32//! return GetResult::Value(Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999)));
33//! }
34//! GetResult::NoSuchObject
35//! })
36//! }
37//!
38//! fn get_next<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetNextResult> {
39//! Box::pin(async move {
40//! // Return the lexicographically next OID after the given one
41//! let sys_descr = oid!(1, 3, 6, 1, 2, 1, 1, 1, 0);
42//! let sys_object_id = oid!(1, 3, 6, 1, 2, 1, 1, 2, 0);
43//!
44//! if oid < &sys_descr {
45//! return GetNextResult::Value(VarBind::new(sys_descr, Value::OctetString("My SNMP Agent".into())));
46//! }
47//! if oid < &sys_object_id {
48//! return GetNextResult::Value(VarBind::new(sys_object_id, Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999))));
49//! }
50//! GetNextResult::EndOfMibView
51//! })
52//! }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Box<async_snmp::Error>> {
57//! let agent = Agent::builder()
58//! .bind("0.0.0.0:161")
59//! .community(b"public")
60//! .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemMibHandler))
61//! .build()
62//! .await?;
63//!
64//! agent.run().await
65//! }
66//! ```
67
68mod request;
69mod response;
70mod set_handler;
71pub mod vacm;
72
73pub use vacm::{SecurityModel, VacmBuilder, VacmConfig, View, ViewCheckResult, ViewSubtree};
74
75use std::collections::HashMap;
76use std::net::SocketAddr;
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU32, Ordering};
79use std::time::Instant;
80
81use bytes::Bytes;
82use subtle::ConstantTimeEq;
83use tokio::net::UdpSocket;
84use tokio::sync::Semaphore;
85use tokio_util::sync::CancellationToken;
86use tracing::instrument;
87
88use std::io::IoSliceMut;
89
90use quinn_udp::{RecvMeta, Transmit, UdpSockRef, UdpSocketState};
91
92use crate::ber::Decoder;
93use crate::error::internal::DecodeErrorKind;
94use crate::error::{Error, ErrorStatus, Result};
95use crate::handler::{GetNextResult, GetResult, MibHandler, RequestContext};
96use crate::notification::UsmUserConfig;
97use crate::oid::Oid;
98use crate::pdu::{Pdu, PduType};
99use crate::util::bind_udp_socket;
100use crate::v3::SaltCounter;
101use crate::value::Value;
102use crate::varbind::VarBind;
103use crate::version::Version;
104
105/// Default maximum message size for UDP (RFC 3417 recommendation).
106const DEFAULT_MAX_MESSAGE_SIZE: usize = 1472;
107
108/// Overhead for SNMP message encoding (approximate conservative estimate).
109/// This accounts for version, community/USM, PDU headers, etc.
110const RESPONSE_OVERHEAD: usize = 100;
111
112/// Registered handler with its OID prefix.
113pub(crate) struct RegisteredHandler {
114 pub(crate) prefix: Oid,
115 pub(crate) handler: Arc<dyn MibHandler>,
116}
117
118/// Builder for [`Agent`].
119///
120/// Use this builder to configure and construct an SNMP agent. The builder
121/// pattern allows you to chain configuration methods before calling
122/// [`build()`](AgentBuilder::build) to create the agent.
123///
124/// # Access Control
125///
126/// By default, the agent operates in **permissive mode**: any authenticated
127/// request (valid community string for v1/v2c, valid USM credentials for v3)
128/// has full read and write access to all registered handlers.
129///
130/// For production deployments, use the [`vacm()`](AgentBuilder::vacm) method
131/// to configure View-based Access Control (RFC 3415), which allows fine-grained
132/// control over which security names can access which OID subtrees.
133///
134/// # Minimal Example
135///
136/// ```rust,no_run
137/// use async_snmp::agent::Agent;
138/// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
139/// use async_snmp::{Oid, Value, VarBind, oid};
140/// use std::sync::Arc;
141///
142/// struct MyHandler;
143/// impl MibHandler for MyHandler {
144/// fn get<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetResult> {
145/// Box::pin(async { GetResult::NoSuchObject })
146/// }
147/// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
148/// Box::pin(async { GetNextResult::EndOfMibView })
149/// }
150/// }
151///
152/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
153/// let agent = Agent::builder()
154/// .bind("0.0.0.0:1161") // Use non-privileged port
155/// .community(b"public")
156/// .handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(MyHandler))
157/// .build()
158/// .await?;
159/// # Ok(())
160/// # }
161/// ```
162pub struct AgentBuilder {
163 bind_addr: String,
164 communities: Vec<Vec<u8>>,
165 usm_users: HashMap<Bytes, UsmUserConfig>,
166 handlers: Vec<RegisteredHandler>,
167 engine_id: Option<Vec<u8>>,
168 max_message_size: usize,
169 max_concurrent_requests: Option<usize>,
170 recv_buffer_size: Option<usize>,
171 vacm: Option<VacmConfig>,
172 cancel: Option<CancellationToken>,
173}
174
175impl AgentBuilder {
176 /// Create a new builder with default settings.
177 ///
178 /// Defaults:
179 /// - Bind address: `0.0.0.0:161` (UDP)
180 /// - Max message size: 1472 bytes (Ethernet MTU - IP/UDP headers)
181 /// - Max concurrent requests: 1000
182 /// - Receive buffer size: 4MB (requested from kernel)
183 /// - No communities or USM users (all requests rejected)
184 /// - No handlers registered
185 pub fn new() -> Self {
186 Self {
187 bind_addr: "0.0.0.0:161".to_string(),
188 communities: Vec::new(),
189 usm_users: HashMap::new(),
190 handlers: Vec::new(),
191 engine_id: None,
192 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
193 max_concurrent_requests: Some(1000),
194 recv_buffer_size: Some(4 * 1024 * 1024), // 4MB
195 vacm: None,
196 cancel: None,
197 }
198 }
199
200 /// Set the UDP bind address.
201 ///
202 /// Default is `0.0.0.0:161` (standard SNMP agent port). Note that binding
203 /// to UDP port 161 typically requires root/administrator privileges.
204 ///
205 /// # IPv4 Examples
206 ///
207 /// ```rust,no_run
208 /// use async_snmp::agent::Agent;
209 ///
210 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
211 /// // Bind to all IPv4 interfaces on standard port (requires privileges)
212 /// let agent = Agent::builder().bind("0.0.0.0:161").community(b"public").build().await?;
213 ///
214 /// // Bind to localhost only on non-privileged port
215 /// let agent = Agent::builder().bind("127.0.0.1:1161").community(b"public").build().await?;
216 ///
217 /// // Bind to specific interface
218 /// let agent = Agent::builder().bind("192.168.1.100:161").community(b"public").build().await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 ///
223 /// # IPv6 / Dual-Stack Examples
224 ///
225 /// ```rust,no_run
226 /// use async_snmp::agent::Agent;
227 ///
228 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
229 /// // Bind to all interfaces via dual-stack (handles both IPv4 and IPv6)
230 /// let agent = Agent::builder().bind("[::]:161").community(b"public").build().await?;
231 ///
232 /// // Bind to IPv6 localhost only
233 /// let agent = Agent::builder().bind("[::1]:1161").community(b"public").build().await?;
234 /// # Ok(())
235 /// # }
236 /// ```
237 pub fn bind(mut self, addr: impl Into<String>) -> Self {
238 self.bind_addr = addr.into();
239 self
240 }
241
242 /// Add an accepted community string for v1/v2c requests.
243 ///
244 /// Multiple communities can be added. If none are added,
245 /// all v1/v2c requests are rejected.
246 ///
247 /// # Example
248 ///
249 /// ```rust,no_run
250 /// use async_snmp::agent::Agent;
251 ///
252 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
253 /// let agent = Agent::builder()
254 /// .bind("0.0.0.0:1161")
255 /// .community(b"public") // Read-only access
256 /// .community(b"private") // Read-write access (with VACM)
257 /// .build()
258 /// .await?;
259 /// # Ok(())
260 /// # }
261 /// ```
262 pub fn community(mut self, community: &[u8]) -> Self {
263 self.communities.push(community.to_vec());
264 self
265 }
266
267 /// Add multiple community strings.
268 ///
269 /// # Example
270 ///
271 /// ```rust,no_run
272 /// use async_snmp::agent::Agent;
273 ///
274 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
275 /// let communities = ["public", "private", "monitor"];
276 /// let agent = Agent::builder()
277 /// .bind("0.0.0.0:1161")
278 /// .communities(communities)
279 /// .build()
280 /// .await?;
281 /// # Ok(())
282 /// # }
283 /// ```
284 pub fn communities<I, C>(mut self, communities: I) -> Self
285 where
286 I: IntoIterator<Item = C>,
287 C: AsRef<[u8]>,
288 {
289 for c in communities {
290 self.communities.push(c.as_ref().to_vec());
291 }
292 self
293 }
294
295 /// Add a USM user for SNMPv3 authentication.
296 ///
297 /// Configure authentication and privacy settings using the closure.
298 /// Multiple users can be added with different security levels.
299 ///
300 /// # Security Levels
301 ///
302 /// - **noAuthNoPriv**: No authentication or encryption
303 /// - **authNoPriv**: Authentication only (HMAC verification)
304 /// - **authPriv**: Authentication and encryption
305 ///
306 /// # Example
307 ///
308 /// ```rust,no_run
309 /// use async_snmp::agent::Agent;
310 /// use async_snmp::{AuthProtocol, PrivProtocol};
311 ///
312 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
313 /// let agent = Agent::builder()
314 /// .bind("0.0.0.0:1161")
315 /// // Read-only user with authentication only
316 /// .usm_user("monitor", |u| {
317 /// u.auth(AuthProtocol::Sha256, b"monitorpass123")
318 /// })
319 /// // Admin user with full encryption
320 /// .usm_user("admin", |u| {
321 /// u.auth(AuthProtocol::Sha256, b"adminauth123")
322 /// .privacy(PrivProtocol::Aes128, b"adminpriv123")
323 /// })
324 /// .build()
325 /// .await?;
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub fn usm_user<F>(mut self, username: impl Into<Bytes>, configure: F) -> Self
330 where
331 F: FnOnce(UsmUserConfig) -> UsmUserConfig,
332 {
333 let username_bytes: Bytes = username.into();
334 let config = configure(UsmUserConfig::new(username_bytes.clone()));
335 self.usm_users.insert(username_bytes, config);
336 self
337 }
338
339 /// Set the engine ID for SNMPv3.
340 ///
341 /// If not set, a default engine ID will be generated based on the
342 /// RFC 3411 format using enterprise number and timestamp.
343 ///
344 /// # Example
345 ///
346 /// ```rust,no_run
347 /// use async_snmp::agent::Agent;
348 ///
349 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
350 /// let agent = Agent::builder()
351 /// .bind("0.0.0.0:1161")
352 /// .engine_id(b"\x80\x00\x00\x00\x01MyEngine".to_vec())
353 /// .community(b"public")
354 /// .build()
355 /// .await?;
356 /// # Ok(())
357 /// # }
358 /// ```
359 pub fn engine_id(mut self, engine_id: impl Into<Vec<u8>>) -> Self {
360 self.engine_id = Some(engine_id.into());
361 self
362 }
363
364 /// Set the maximum message size for responses.
365 ///
366 /// Default is 1472 octets (fits Ethernet MTU minus IP/UDP headers).
367 /// GETBULK responses will be truncated to fit within this limit.
368 ///
369 /// For SNMPv3 requests, the agent uses the minimum of this value
370 /// and the msgMaxSize from the request.
371 pub fn max_message_size(mut self, size: usize) -> Self {
372 self.max_message_size = size;
373 self
374 }
375
376 /// Set the maximum number of concurrent requests the agent will process.
377 ///
378 /// Default is 1000. Requests beyond this limit will queue until a slot
379 /// becomes available. Set to `None` for unbounded concurrency.
380 ///
381 /// This controls memory usage under high load while still allowing
382 /// parallel request processing.
383 pub fn max_concurrent_requests(mut self, limit: Option<usize>) -> Self {
384 self.max_concurrent_requests = limit;
385 self
386 }
387
388 /// Set the UDP socket receive buffer size.
389 ///
390 /// Default is 4MB. The kernel may cap this at `net.core.rmem_max`.
391 /// A larger buffer prevents packet loss during request bursts.
392 ///
393 /// Set to `None` to use the kernel default.
394 pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
395 self.recv_buffer_size = size;
396 self
397 }
398
399 /// Register a MIB handler for an OID subtree.
400 ///
401 /// Handlers are matched by longest prefix. When a request comes in,
402 /// the handler with the longest matching prefix is used.
403 ///
404 /// # Example
405 ///
406 /// ```rust,no_run
407 /// use async_snmp::agent::Agent;
408 /// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
409 /// use async_snmp::{Oid, Value, VarBind, oid};
410 /// use std::sync::Arc;
411 ///
412 /// struct SystemHandler;
413 /// impl MibHandler for SystemHandler {
414 /// fn get<'a>(&'a self, _: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
415 /// Box::pin(async move {
416 /// if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
417 /// GetResult::Value(Value::OctetString("My Agent".into()))
418 /// } else {
419 /// GetResult::NoSuchObject
420 /// }
421 /// })
422 /// }
423 /// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
424 /// Box::pin(async { GetNextResult::EndOfMibView })
425 /// }
426 /// }
427 ///
428 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
429 /// let agent = Agent::builder()
430 /// .bind("0.0.0.0:1161")
431 /// .community(b"public")
432 /// // Register handler for system MIB subtree
433 /// .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemHandler))
434 /// .build()
435 /// .await?;
436 /// # Ok(())
437 /// # }
438 /// ```
439 pub fn handler(mut self, prefix: Oid, handler: Arc<dyn MibHandler>) -> Self {
440 self.handlers.push(RegisteredHandler { prefix, handler });
441 self
442 }
443
444 /// Configure VACM (View-based Access Control Model) using a builder function.
445 ///
446 /// When VACM is configured, all requests are checked against the configured
447 /// access control rules. Requests that don't have proper access are rejected
448 /// with `noAccess` error (v2c/v3) or `noSuchName` (v1).
449 ///
450 /// **Without VACM configuration, the agent operates in permissive mode**:
451 /// any authenticated request has full read/write access to all handlers.
452 ///
453 /// # Example
454 ///
455 /// ```rust,no_run
456 /// use async_snmp::agent::{Agent, SecurityModel, VacmBuilder};
457 /// use async_snmp::message::SecurityLevel;
458 /// use async_snmp::oid;
459 ///
460 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
461 /// let agent = Agent::builder()
462 /// .bind("0.0.0.0:161")
463 /// .community(b"public")
464 /// .community(b"private")
465 /// .vacm(|v| v
466 /// .group("public", SecurityModel::V2c, "readonly_group")
467 /// .group("private", SecurityModel::V2c, "readwrite_group")
468 /// .access("readonly_group", |a| a
469 /// .read_view("full_view"))
470 /// .access("readwrite_group", |a| a
471 /// .read_view("full_view")
472 /// .write_view("write_view"))
473 /// .view("full_view", |v| v
474 /// .include(oid!(1, 3, 6, 1)))
475 /// .view("write_view", |v| v
476 /// .include(oid!(1, 3, 6, 1, 2, 1, 1))))
477 /// .build()
478 /// .await?;
479 /// # Ok(())
480 /// # }
481 /// ```
482 pub fn vacm<F>(mut self, configure: F) -> Self
483 where
484 F: FnOnce(VacmBuilder) -> VacmBuilder,
485 {
486 let builder = VacmBuilder::new();
487 self.vacm = Some(configure(builder).build());
488 self
489 }
490
491 /// Set a cancellation token for graceful shutdown.
492 ///
493 /// If not set, the agent creates its own token accessible via `Agent::cancel()`.
494 pub fn cancel(mut self, token: CancellationToken) -> Self {
495 self.cancel = Some(token);
496 self
497 }
498
499 /// Build the agent.
500 pub async fn build(mut self) -> Result<Agent> {
501 let bind_addr: std::net::SocketAddr = self.bind_addr.parse().map_err(|_| {
502 Error::Config(format!("invalid bind address: {}", self.bind_addr).into())
503 })?;
504
505 let socket = bind_udp_socket(bind_addr, self.recv_buffer_size)
506 .await
507 .map_err(|e| Error::Network {
508 target: bind_addr,
509 source: e,
510 })?;
511
512 let local_addr = socket.local_addr().map_err(|e| Error::Network {
513 target: bind_addr,
514 source: e,
515 })?;
516
517 let socket_state =
518 UdpSocketState::new(UdpSockRef::from(&socket)).map_err(|e| Error::Network {
519 target: bind_addr,
520 source: e,
521 })?;
522
523 // Generate default engine ID if not provided
524 let engine_id = self.engine_id.unwrap_or_else(|| {
525 // RFC 3411 format: enterprise number + format + local identifier
526 // Use a simple format: 0x80 (local) + timestamp + random
527 let mut id = vec![0x80, 0x00, 0x00, 0x00, 0x01]; // Enterprise format indicator
528 let timestamp = std::time::SystemTime::now()
529 .duration_since(std::time::UNIX_EPOCH)
530 .unwrap_or_default()
531 .as_secs();
532 id.extend_from_slice(×tamp.to_be_bytes());
533 id
534 });
535
536 // Sort handlers by prefix length (longest first) for matching
537 self.handlers
538 .sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
539
540 let cancel = self.cancel.unwrap_or_default();
541
542 // Create concurrency limiter if configured
543 let concurrency_limit = self
544 .max_concurrent_requests
545 .map(|n| Arc::new(Semaphore::new(n)));
546
547 Ok(Agent {
548 inner: Arc::new(AgentInner {
549 socket: Arc::new(socket),
550 socket_state,
551 local_addr,
552 communities: self.communities,
553 usm_users: self.usm_users,
554 handlers: self.handlers,
555 engine_id,
556 engine_boots: AtomicU32::new(1),
557 engine_time: AtomicU32::new(0),
558 engine_start: Instant::now(),
559 salt_counter: SaltCounter::new(),
560 max_message_size: self.max_message_size,
561 concurrency_limit,
562 vacm: self.vacm,
563 snmp_invalid_msgs: AtomicU32::new(0),
564 snmp_unknown_security_models: AtomicU32::new(0),
565 snmp_silent_drops: AtomicU32::new(0),
566 cancel,
567 }),
568 })
569 }
570}
571
572impl Default for AgentBuilder {
573 fn default() -> Self {
574 Self::new()
575 }
576}
577
578/// Inner state shared across agent clones.
579pub(crate) struct AgentInner {
580 pub(crate) socket: Arc<UdpSocket>,
581 pub(crate) socket_state: UdpSocketState,
582 pub(crate) local_addr: SocketAddr,
583 pub(crate) communities: Vec<Vec<u8>>,
584 pub(crate) usm_users: HashMap<Bytes, UsmUserConfig>,
585 pub(crate) handlers: Vec<RegisteredHandler>,
586 pub(crate) engine_id: Vec<u8>,
587 pub(crate) engine_boots: AtomicU32,
588 pub(crate) engine_time: AtomicU32,
589 pub(crate) engine_start: Instant,
590 pub(crate) salt_counter: SaltCounter,
591 pub(crate) max_message_size: usize,
592 pub(crate) concurrency_limit: Option<Arc<Semaphore>>,
593 pub(crate) vacm: Option<VacmConfig>,
594 // RFC 3412 statistics counters
595 /// snmpInvalidMsgs (1.3.6.1.6.3.11.2.1.2) - messages with invalid msgFlags
596 /// (e.g., privacy without authentication)
597 pub(crate) snmp_invalid_msgs: AtomicU32,
598 /// snmpUnknownSecurityModels (1.3.6.1.6.3.11.2.1.1) - messages with
599 /// unrecognized security model
600 pub(crate) snmp_unknown_security_models: AtomicU32,
601 /// snmpSilentDrops (1.3.6.1.6.3.11.2.1.3) - confirmed-class PDUs silently
602 /// dropped because even an empty response would exceed max message size
603 pub(crate) snmp_silent_drops: AtomicU32,
604 /// Cancellation token for graceful shutdown.
605 pub(crate) cancel: CancellationToken,
606}
607
608/// SNMP Agent.
609///
610/// Listens for and responds to SNMP requests (GET, GETNEXT, GETBULK, SET).
611///
612/// # Example
613///
614/// ```rust,no_run
615/// use async_snmp::agent::Agent;
616/// use async_snmp::oid;
617///
618/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
619/// let agent = Agent::builder()
620/// .bind("0.0.0.0:161")
621/// .community(b"public")
622/// .build()
623/// .await?;
624///
625/// agent.run().await
626/// # }
627/// ```
628pub struct Agent {
629 pub(crate) inner: Arc<AgentInner>,
630}
631
632impl Agent {
633 /// Create a builder for configuring the agent.
634 pub fn builder() -> AgentBuilder {
635 AgentBuilder::new()
636 }
637
638 /// Get the local address the agent is bound to.
639 pub fn local_addr(&self) -> SocketAddr {
640 self.inner.local_addr
641 }
642
643 /// Get the engine ID.
644 pub fn engine_id(&self) -> &[u8] {
645 &self.inner.engine_id
646 }
647
648 /// Get the cancellation token for this agent.
649 ///
650 /// Call `token.cancel()` to initiate graceful shutdown.
651 pub fn cancel(&self) -> CancellationToken {
652 self.inner.cancel.clone()
653 }
654
655 /// Get the snmpInvalidMsgs counter value.
656 ///
657 /// This counter tracks messages with invalid msgFlags, such as
658 /// privacy-without-authentication (RFC 3412 Section 7.2 Step 5d).
659 ///
660 /// OID: 1.3.6.1.6.3.11.2.1.2
661 pub fn snmp_invalid_msgs(&self) -> u32 {
662 self.inner.snmp_invalid_msgs.load(Ordering::Relaxed)
663 }
664
665 /// Get the snmpUnknownSecurityModels counter value.
666 ///
667 /// This counter tracks messages with unrecognized security models
668 /// (RFC 3412 Section 7.2 Step 2).
669 ///
670 /// OID: 1.3.6.1.6.3.11.2.1.1
671 pub fn snmp_unknown_security_models(&self) -> u32 {
672 self.inner
673 .snmp_unknown_security_models
674 .load(Ordering::Relaxed)
675 }
676
677 /// Get the snmpSilentDrops counter value.
678 ///
679 /// This counter tracks confirmed-class PDUs (GetRequest, GetNextRequest,
680 /// GetBulkRequest, SetRequest, InformRequest) that were silently dropped
681 /// because even an empty Response-PDU would exceed the maximum message
682 /// size constraint (RFC 3412 Section 7.1).
683 ///
684 /// OID: 1.3.6.1.6.3.11.2.1.3
685 pub fn snmp_silent_drops(&self) -> u32 {
686 self.inner.snmp_silent_drops.load(Ordering::Relaxed)
687 }
688
689 /// Run the agent, processing requests concurrently.
690 ///
691 /// Requests are processed in parallel up to the configured
692 /// `max_concurrent_requests` limit (default: 1000). This method runs
693 /// until the cancellation token is triggered.
694 #[instrument(skip(self), err, fields(snmp.local_addr = %self.local_addr()))]
695 pub async fn run(&self) -> Result<()> {
696 let mut buf = vec![0u8; 65535];
697
698 loop {
699 let recv_meta = tokio::select! {
700 result = self.recv_packet(&mut buf) => {
701 result?
702 }
703 _ = self.inner.cancel.cancelled() => {
704 tracing::info!(target: "async_snmp::agent", "agent shutdown requested");
705 return Ok(());
706 }
707 };
708
709 let data = Bytes::copy_from_slice(&buf[..recv_meta.len]);
710 let agent = self.clone();
711
712 let permit = if let Some(ref sem) = self.inner.concurrency_limit {
713 Some(sem.clone().acquire_owned().await.expect("semaphore closed"))
714 } else {
715 None
716 };
717
718 tokio::spawn(async move {
719 agent.update_engine_time();
720
721 match agent.handle_request(data, recv_meta.addr).await {
722 Ok(Some(response_bytes)) => {
723 if let Err(e) = agent.send_response(&response_bytes, &recv_meta).await {
724 tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "failed to send response");
725 }
726 }
727 Ok(None) => {}
728 Err(e) => {
729 tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "error handling request");
730 }
731 }
732
733 drop(permit);
734 });
735 }
736 }
737
738 async fn recv_packet(&self, buf: &mut [u8]) -> Result<RecvMeta> {
739 let mut iov = [IoSliceMut::new(buf)];
740 let mut meta = [RecvMeta::default()];
741
742 loop {
743 self.inner
744 .socket
745 .readable()
746 .await
747 .map_err(|e| Error::Network {
748 target: self.inner.local_addr,
749 source: e,
750 })?;
751
752 let result = self.inner.socket.try_io(tokio::io::Interest::READABLE, || {
753 let sref = UdpSockRef::from(&*self.inner.socket);
754 self.inner.socket_state.recv(sref, &mut iov, &mut meta)
755 });
756
757 match result {
758 Ok(n) if n > 0 => return Ok(meta[0]),
759 Ok(_) => continue,
760 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
761 Err(e) => {
762 return Err(Error::Network {
763 target: self.inner.local_addr,
764 source: e,
765 }
766 .boxed());
767 }
768 }
769 }
770 }
771
772 async fn send_response(&self, data: &[u8], recv_meta: &RecvMeta) -> std::io::Result<()> {
773 let transmit = Transmit {
774 destination: recv_meta.addr,
775 ecn: None,
776 contents: data,
777 segment_size: None,
778 src_ip: recv_meta.dst_ip,
779 };
780
781 loop {
782 self.inner.socket.writable().await?;
783
784 let result = self.inner.socket.try_io(tokio::io::Interest::WRITABLE, || {
785 let sref = UdpSockRef::from(&*self.inner.socket);
786 self.inner.socket_state.try_send(sref, &transmit)
787 });
788
789 match result {
790 Ok(()) => return Ok(()),
791 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
792 Err(e) => return Err(e),
793 }
794 }
795 }
796
797 /// Process a single request and return the response bytes.
798 ///
799 /// Returns `None` if no response should be sent.
800 async fn handle_request(&self, data: Bytes, source: SocketAddr) -> Result<Option<Bytes>> {
801 // Peek at version
802 let mut decoder = Decoder::with_target(data.clone(), source);
803 let mut seq = decoder.read_sequence()?;
804 let version_num = seq.read_integer()?;
805 let version = Version::from_i32(version_num).ok_or_else(|| {
806 tracing::debug!(target: "async_snmp::agent", { source = %source, kind = %DecodeErrorKind::UnknownVersion(version_num) }, "unknown SNMP version");
807 Error::MalformedResponse { target: source }.boxed()
808 })?;
809 drop(seq);
810 drop(decoder);
811
812 match version {
813 Version::V1 => self.handle_v1(data, source).await,
814 Version::V2c => self.handle_v2c(data, source).await,
815 Version::V3 => self.handle_v3(data, source).await,
816 }
817 }
818
819 /// Update engine time based on elapsed time since start.
820 fn update_engine_time(&self) {
821 let elapsed = self.inner.engine_start.elapsed().as_secs() as u32;
822 self.inner.engine_time.store(elapsed, Ordering::Relaxed);
823 }
824
825 /// Validate community string using constant-time comparison.
826 ///
827 /// Uses constant-time comparison to prevent timing attacks that could
828 /// be used to guess valid community strings character by character.
829 pub(crate) fn validate_community(&self, community: &[u8]) -> bool {
830 if self.inner.communities.is_empty() {
831 // No communities configured = reject all
832 return false;
833 }
834 // Use constant-time comparison for each community string.
835 // We compare against all configured communities regardless of
836 // early matches to maintain constant-time behavior.
837 let mut valid = false;
838 for configured in &self.inner.communities {
839 // ct_eq returns a Choice, which we convert to bool after comparison
840 if configured.len() == community.len()
841 && bool::from(configured.as_slice().ct_eq(community))
842 {
843 valid = true;
844 }
845 }
846 valid
847 }
848
849 /// Dispatch a request to the appropriate handler.
850 async fn dispatch_request(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
851 match pdu.pdu_type {
852 PduType::GetRequest => self.handle_get(ctx, pdu).await,
853 PduType::GetNextRequest => self.handle_get_next(ctx, pdu).await,
854 PduType::GetBulkRequest => self.handle_get_bulk(ctx, pdu).await,
855 PduType::SetRequest => self.handle_set(ctx, pdu).await,
856 PduType::InformRequest => self.handle_inform(pdu),
857 _ => {
858 // Should not happen - filtered earlier
859 Ok(pdu.to_error_response(ErrorStatus::GenErr, 0))
860 }
861 }
862 }
863
864 /// Handle InformRequest PDU.
865 ///
866 /// Per RFC 3416 Section 4.2.7, an InformRequest is a confirmed-class PDU
867 /// that the receiver acknowledges by returning a Response with the same
868 /// request-id and varbind list.
869 fn handle_inform(&self, pdu: &Pdu) -> Result<Pdu> {
870 // Simply acknowledge by returning the same varbinds in a Response
871 Ok(Pdu {
872 pdu_type: PduType::Response,
873 request_id: pdu.request_id,
874 error_status: 0,
875 error_index: 0,
876 varbinds: pdu.varbinds.clone(),
877 })
878 }
879
880 /// Handle GET request.
881 async fn handle_get(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
882 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
883
884 for (index, vb) in pdu.varbinds.iter().enumerate() {
885 // VACM read access check
886 if let Some(ref vacm) = self.inner.vacm
887 && !vacm.check_access(ctx.read_view.as_ref(), &vb.oid)
888 {
889 // v1: noSuchName, v2c/v3: noAccess or NoSuchObject
890 if ctx.version == Version::V1 {
891 return Ok(Pdu {
892 pdu_type: PduType::Response,
893 request_id: pdu.request_id,
894 error_status: ErrorStatus::NoSuchName.as_i32(),
895 error_index: (index + 1) as i32,
896 varbinds: pdu.varbinds.clone(),
897 });
898 } else {
899 // For GET, return NoSuchObject for inaccessible OIDs per RFC 3415
900 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::NoSuchObject));
901 continue;
902 }
903 }
904
905 let result = if let Some(handler) = self.find_handler(&vb.oid) {
906 handler.handler.get(ctx, &vb.oid).await
907 } else {
908 GetResult::NoSuchObject
909 };
910
911 let response_value = match result {
912 GetResult::Value(v) => v,
913 GetResult::NoSuchObject => {
914 // v1 returns noSuchName error, v2c/v3 returns NoSuchObject exception
915 if ctx.version == Version::V1 {
916 return Ok(Pdu {
917 pdu_type: PduType::Response,
918 request_id: pdu.request_id,
919 error_status: ErrorStatus::NoSuchName.as_i32(),
920 error_index: (index + 1) as i32,
921 varbinds: pdu.varbinds.clone(),
922 });
923 } else {
924 Value::NoSuchObject
925 }
926 }
927 GetResult::NoSuchInstance => {
928 // v1 returns noSuchName error, v2c/v3 returns NoSuchInstance exception
929 if ctx.version == Version::V1 {
930 return Ok(Pdu {
931 pdu_type: PduType::Response,
932 request_id: pdu.request_id,
933 error_status: ErrorStatus::NoSuchName.as_i32(),
934 error_index: (index + 1) as i32,
935 varbinds: pdu.varbinds.clone(),
936 });
937 } else {
938 Value::NoSuchInstance
939 }
940 }
941 };
942
943 response_varbinds.push(VarBind::new(vb.oid.clone(), response_value));
944 }
945
946 Ok(Pdu {
947 pdu_type: PduType::Response,
948 request_id: pdu.request_id,
949 error_status: 0,
950 error_index: 0,
951 varbinds: response_varbinds,
952 })
953 }
954
955 /// Handle GETNEXT request.
956 async fn handle_get_next(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
957 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
958
959 for (index, vb) in pdu.varbinds.iter().enumerate() {
960 // Try to find the next OID from any handler
961 let next = self.get_next_oid(ctx, &vb.oid).await;
962
963 // Check VACM access for the returned OID (if VACM enabled)
964 let next = if let Some(ref next_vb) = next {
965 if let Some(ref vacm) = self.inner.vacm {
966 if vacm.check_access(ctx.read_view.as_ref(), &next_vb.oid) {
967 next
968 } else {
969 // OID not accessible, continue searching
970 // For simplicity, we just return EndOfMibView here
971 // A more complete implementation would continue the search
972 None
973 }
974 } else {
975 next
976 }
977 } else {
978 next
979 };
980
981 match next {
982 Some(next_vb) => {
983 response_varbinds.push(next_vb);
984 }
985 None => {
986 // v1 returns noSuchName, v2c/v3 returns endOfMibView
987 if ctx.version == Version::V1 {
988 return Ok(Pdu {
989 pdu_type: PduType::Response,
990 request_id: pdu.request_id,
991 error_status: ErrorStatus::NoSuchName.as_i32(),
992 error_index: (index + 1) as i32,
993 varbinds: pdu.varbinds.clone(),
994 });
995 } else {
996 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::EndOfMibView));
997 }
998 }
999 }
1000 }
1001
1002 Ok(Pdu {
1003 pdu_type: PduType::Response,
1004 request_id: pdu.request_id,
1005 error_status: 0,
1006 error_index: 0,
1007 varbinds: response_varbinds,
1008 })
1009 }
1010
1011 /// Handle GETBULK request.
1012 ///
1013 /// Per RFC 3416 Section 4.2.3, if the response would exceed the message
1014 /// size limit, we return fewer variable bindings rather than all of them.
1015 async fn handle_get_bulk(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
1016 // For GETBULK, error_status is non_repeaters and error_index is max_repetitions
1017 let non_repeaters = pdu.error_status.max(0) as usize;
1018 let max_repetitions = pdu.error_index.max(0) as usize;
1019
1020 let mut response_varbinds = Vec::new();
1021 let mut current_size: usize = RESPONSE_OVERHEAD;
1022 let max_size = self.inner.max_message_size;
1023
1024 // Helper to check if we can add a varbind
1025 let can_add = |vb: &VarBind, current_size: usize| -> bool {
1026 current_size + vb.encoded_size() <= max_size
1027 };
1028
1029 // Handle non-repeaters (first N varbinds get one GETNEXT each)
1030 for vb in pdu.varbinds.iter().take(non_repeaters) {
1031 let next_vb = match self.get_next_oid(ctx, &vb.oid).await {
1032 Some(next_vb) => next_vb,
1033 None => VarBind::new(vb.oid.clone(), Value::EndOfMibView),
1034 };
1035
1036 if !can_add(&next_vb, current_size) {
1037 // Can't fit even non-repeaters, return tooBig if we have nothing
1038 if response_varbinds.is_empty() {
1039 return Ok(Pdu {
1040 pdu_type: PduType::Response,
1041 request_id: pdu.request_id,
1042 error_status: ErrorStatus::TooBig.as_i32(),
1043 error_index: 0,
1044 varbinds: pdu.varbinds.clone(),
1045 });
1046 }
1047 // Otherwise return what we have
1048 break;
1049 }
1050
1051 current_size += next_vb.encoded_size();
1052 response_varbinds.push(next_vb);
1053 }
1054
1055 // Handle repeaters
1056 if non_repeaters < pdu.varbinds.len() {
1057 let repeaters = &pdu.varbinds[non_repeaters..];
1058 let mut current_oids: Vec<Oid> = repeaters.iter().map(|vb| vb.oid.clone()).collect();
1059 let mut all_done = vec![false; repeaters.len()];
1060
1061 'outer: for _ in 0..max_repetitions {
1062 let mut row_complete = true;
1063 for (i, oid) in current_oids.iter_mut().enumerate() {
1064 let next_vb = if all_done[i] {
1065 VarBind::new(oid.clone(), Value::EndOfMibView)
1066 } else {
1067 match self.get_next_oid(ctx, oid).await {
1068 Some(next_vb) => {
1069 *oid = next_vb.oid.clone();
1070 row_complete = false;
1071 next_vb
1072 }
1073 None => {
1074 all_done[i] = true;
1075 VarBind::new(oid.clone(), Value::EndOfMibView)
1076 }
1077 }
1078 };
1079
1080 // Check size before adding
1081 if !can_add(&next_vb, current_size) {
1082 // Can't fit more, return what we have
1083 break 'outer;
1084 }
1085
1086 current_size += next_vb.encoded_size();
1087 response_varbinds.push(next_vb);
1088 }
1089
1090 if row_complete {
1091 break;
1092 }
1093 }
1094 }
1095
1096 Ok(Pdu {
1097 pdu_type: PduType::Response,
1098 request_id: pdu.request_id,
1099 error_status: 0,
1100 error_index: 0,
1101 varbinds: response_varbinds,
1102 })
1103 }
1104
1105 /// Find the handler for a given OID.
1106 pub(crate) fn find_handler(&self, oid: &Oid) -> Option<&RegisteredHandler> {
1107 // Handlers are sorted by prefix length (longest first)
1108 self.inner
1109 .handlers
1110 .iter()
1111 .find(|&handler| handler.handler.handles(&handler.prefix, oid))
1112 .map(|v| v as _)
1113 }
1114
1115 /// Get the next OID from any handler.
1116 async fn get_next_oid(&self, ctx: &RequestContext, oid: &Oid) -> Option<VarBind> {
1117 // Find the first handler that can provide a next OID
1118 let mut best_result: Option<VarBind> = None;
1119
1120 for handler in &self.inner.handlers {
1121 if let GetNextResult::Value(next) = handler.handler.get_next(ctx, oid).await {
1122 // Must be lexicographically greater than the request OID
1123 if next.oid > *oid {
1124 match &best_result {
1125 None => best_result = Some(next),
1126 Some(current) if next.oid < current.oid => best_result = Some(next),
1127 _ => {}
1128 }
1129 }
1130 }
1131 }
1132
1133 best_result
1134 }
1135}
1136
1137impl Clone for Agent {
1138 fn clone(&self) -> Self {
1139 Self {
1140 inner: Arc::clone(&self.inner),
1141 }
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::handler::{
1149 BoxFuture, GetNextResult, GetResult, MibHandler, RequestContext, SecurityModel, SetResult,
1150 };
1151 use crate::message::SecurityLevel;
1152 use crate::oid;
1153
1154 struct TestHandler;
1155
1156 impl MibHandler for TestHandler {
1157 fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
1158 Box::pin(async move {
1159 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0) {
1160 return GetResult::Value(Value::Integer(42));
1161 }
1162 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0) {
1163 return GetResult::Value(Value::OctetString(Bytes::from_static(b"test")));
1164 }
1165 GetResult::NoSuchObject
1166 })
1167 }
1168
1169 fn get_next<'a>(
1170 &'a self,
1171 _ctx: &'a RequestContext,
1172 oid: &'a Oid,
1173 ) -> BoxFuture<'a, GetNextResult> {
1174 Box::pin(async move {
1175 let oid1 = oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0);
1176 let oid2 = oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0);
1177
1178 if oid < &oid1 {
1179 return GetNextResult::Value(VarBind::new(oid1, Value::Integer(42)));
1180 }
1181 if oid < &oid2 {
1182 return GetNextResult::Value(VarBind::new(
1183 oid2,
1184 Value::OctetString(Bytes::from_static(b"test")),
1185 ));
1186 }
1187 GetNextResult::EndOfMibView
1188 })
1189 }
1190 }
1191
1192 fn test_ctx() -> RequestContext {
1193 RequestContext {
1194 source: "127.0.0.1:12345".parse().unwrap(),
1195 version: Version::V2c,
1196 security_model: SecurityModel::V2c,
1197 security_name: Bytes::from_static(b"public"),
1198 security_level: SecurityLevel::NoAuthNoPriv,
1199 context_name: Bytes::new(),
1200 request_id: 1,
1201 pdu_type: PduType::GetRequest,
1202 group_name: None,
1203 read_view: None,
1204 write_view: None,
1205 }
1206 }
1207
1208 #[test]
1209 fn test_agent_builder_defaults() {
1210 let builder = AgentBuilder::new();
1211 assert_eq!(builder.bind_addr, "0.0.0.0:161");
1212 assert!(builder.communities.is_empty());
1213 assert!(builder.usm_users.is_empty());
1214 assert!(builder.handlers.is_empty());
1215 }
1216
1217 #[test]
1218 fn test_agent_builder_community() {
1219 let builder = AgentBuilder::new()
1220 .community(b"public")
1221 .community(b"private");
1222 assert_eq!(builder.communities.len(), 2);
1223 }
1224
1225 #[test]
1226 fn test_agent_builder_communities() {
1227 let builder = AgentBuilder::new().communities(["public", "private"]);
1228 assert_eq!(builder.communities.len(), 2);
1229 }
1230
1231 #[test]
1232 fn test_agent_builder_handler() {
1233 let builder =
1234 AgentBuilder::new().handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(TestHandler));
1235 assert_eq!(builder.handlers.len(), 1);
1236 }
1237
1238 #[tokio::test]
1239 async fn test_mib_handler_default_set() {
1240 let handler = TestHandler;
1241 let mut ctx = test_ctx();
1242 ctx.pdu_type = PduType::SetRequest;
1243
1244 let result = handler
1245 .test_set(&ctx, &oid!(1, 3, 6, 1), &Value::Integer(1))
1246 .await;
1247 assert_eq!(result, SetResult::NotWritable);
1248 }
1249
1250 #[test]
1251 fn test_mib_handler_handles() {
1252 let handler = TestHandler;
1253 let prefix = oid!(1, 3, 6, 1, 4, 1, 99999);
1254
1255 // OID within prefix
1256 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0)));
1257
1258 // OID before prefix (GETNEXT should still try)
1259 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99998)));
1260
1261 // OID after prefix (not handled)
1262 assert!(!handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 100000)));
1263 }
1264
1265 #[tokio::test]
1266 async fn test_test_handler_get() {
1267 let handler = TestHandler;
1268 let ctx = test_ctx();
1269
1270 // Existing OID
1271 let result = handler
1272 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1273 .await;
1274 assert!(matches!(result, GetResult::Value(Value::Integer(42))));
1275
1276 // Non-existing OID
1277 let result = handler
1278 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 99, 0))
1279 .await;
1280 assert!(matches!(result, GetResult::NoSuchObject));
1281 }
1282
1283 #[tokio::test]
1284 async fn test_test_handler_get_next() {
1285 let handler = TestHandler;
1286 let mut ctx = test_ctx();
1287 ctx.pdu_type = PduType::GetNextRequest;
1288
1289 // Before first OID
1290 let next = handler.get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999)).await;
1291 assert!(next.is_value());
1292 if let GetNextResult::Value(vb) = next {
1293 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0));
1294 }
1295
1296 // Between OIDs
1297 let next = handler
1298 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1299 .await;
1300 assert!(next.is_value());
1301 if let GetNextResult::Value(vb) = next {
1302 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0));
1303 }
1304
1305 // After last OID
1306 let next = handler
1307 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0))
1308 .await;
1309 assert!(next.is_end_of_mib_view());
1310 }
1311}