async_snmp/agent/mod.rs
1//! SNMP Agent (RFC 3413).
2//!
3//! This module provides SNMP agent functionality for responding to
4//! GET, GETNEXT, GETBULK, and SET requests.
5//!
6//! # Features
7//!
8//! - **Async handlers**: All handler methods are async for database queries, network calls, etc.
9//! - **Atomic SET**: Two-phase commit protocol (test/commit/undo) per RFC 3416
10//! - **VACM support**: Optional View-based Access Control Model (RFC 3415)
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use async_snmp::agent::Agent;
16//! use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
17//! use async_snmp::{Oid, Value, VarBind, oid};
18//! use std::sync::Arc;
19//!
20//! // Define a simple handler for the system MIB subtree
21//! struct SystemMibHandler;
22//!
23//! impl MibHandler for SystemMibHandler {
24//! fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
25//! Box::pin(async move {
26//! // sysDescr.0
27//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
28//! return GetResult::Value(Value::OctetString("My SNMP Agent".into()));
29//! }
30//! // sysObjectID.0
31//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 2, 0) {
32//! return GetResult::Value(Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999)));
33//! }
34//! GetResult::NoSuchObject
35//! })
36//! }
37//!
38//! fn get_next<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetNextResult> {
39//! Box::pin(async move {
40//! // Return the lexicographically next OID after the given one
41//! let sys_descr = oid!(1, 3, 6, 1, 2, 1, 1, 1, 0);
42//! let sys_object_id = oid!(1, 3, 6, 1, 2, 1, 1, 2, 0);
43//!
44//! if oid < &sys_descr {
45//! return GetNextResult::Value(VarBind::new(sys_descr, Value::OctetString("My SNMP Agent".into())));
46//! }
47//! if oid < &sys_object_id {
48//! return GetNextResult::Value(VarBind::new(sys_object_id, Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999))));
49//! }
50//! GetNextResult::EndOfMibView
51//! })
52//! }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), Box<async_snmp::Error>> {
57//! let agent = Agent::builder()
58//! .bind("0.0.0.0:161")
59//! .community(b"public")
60//! .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemMibHandler))
61//! .build()
62//! .await?;
63//!
64//! agent.run().await
65//! }
66//! ```
67
68mod request;
69mod response;
70mod set_handler;
71pub mod vacm;
72
73pub use vacm::{SecurityModel, VacmBuilder, VacmConfig, View, ViewCheckResult, ViewSubtree};
74
75use std::collections::HashMap;
76use std::net::SocketAddr;
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU32, Ordering};
79use std::time::Instant;
80
81use bytes::Bytes;
82use subtle::ConstantTimeEq;
83use tokio::net::UdpSocket;
84use tokio::sync::Semaphore;
85use tokio_util::sync::CancellationToken;
86use tracing::instrument;
87
88use std::io::IoSliceMut;
89
90use quinn_udp::{RecvMeta, Transmit, UdpSockRef, UdpSocketState};
91
92use crate::ber::Decoder;
93use crate::error::internal::DecodeErrorKind;
94use crate::error::{Error, ErrorStatus, Result};
95use crate::handler::{GetNextResult, GetResult, MibHandler, RequestContext};
96use crate::notification::UsmUserConfig;
97use crate::oid::Oid;
98use crate::pdu::{Pdu, PduType};
99use crate::util::bind_udp_socket;
100use crate::v3::SaltCounter;
101use crate::value::Value;
102use crate::varbind::VarBind;
103use crate::version::Version;
104
105/// Default maximum message size for UDP (RFC 3417 recommendation).
106const DEFAULT_MAX_MESSAGE_SIZE: usize = 1472;
107
108/// Overhead for SNMP message encoding (approximate conservative estimate).
109/// This accounts for version, community/USM, PDU headers, etc.
110const RESPONSE_OVERHEAD: usize = 100;
111
112/// Registered handler with its OID prefix.
113pub(crate) struct RegisteredHandler {
114 pub(crate) prefix: Oid,
115 pub(crate) handler: Arc<dyn MibHandler>,
116}
117
118/// Builder for [`Agent`].
119///
120/// Use this builder to configure and construct an SNMP agent. The builder
121/// pattern allows you to chain configuration methods before calling
122/// [`build()`](AgentBuilder::build) to create the agent.
123///
124/// # Minimal Example
125///
126/// ```rust,no_run
127/// use async_snmp::agent::Agent;
128/// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
129/// use async_snmp::{Oid, Value, VarBind, oid};
130/// use std::sync::Arc;
131///
132/// struct MyHandler;
133/// impl MibHandler for MyHandler {
134/// fn get<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetResult> {
135/// Box::pin(async { GetResult::NoSuchObject })
136/// }
137/// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
138/// Box::pin(async { GetNextResult::EndOfMibView })
139/// }
140/// }
141///
142/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
143/// let agent = Agent::builder()
144/// .bind("0.0.0.0:1161") // Use non-privileged port
145/// .community(b"public")
146/// .handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(MyHandler))
147/// .build()
148/// .await?;
149/// # Ok(())
150/// # }
151/// ```
152pub struct AgentBuilder {
153 bind_addr: String,
154 communities: Vec<Vec<u8>>,
155 usm_users: HashMap<Bytes, UsmUserConfig>,
156 handlers: Vec<RegisteredHandler>,
157 engine_id: Option<Vec<u8>>,
158 max_message_size: usize,
159 max_concurrent_requests: Option<usize>,
160 recv_buffer_size: Option<usize>,
161 vacm: Option<VacmConfig>,
162 cancel: Option<CancellationToken>,
163}
164
165impl AgentBuilder {
166 /// Create a new builder with default settings.
167 ///
168 /// Defaults:
169 /// - Bind address: `0.0.0.0:161` (UDP)
170 /// - Max message size: 1472 bytes (Ethernet MTU - IP/UDP headers)
171 /// - Max concurrent requests: 1000
172 /// - Receive buffer size: 4MB (requested from kernel)
173 /// - No communities or USM users (all requests rejected)
174 /// - No handlers registered
175 pub fn new() -> Self {
176 Self {
177 bind_addr: "0.0.0.0:161".to_string(),
178 communities: Vec::new(),
179 usm_users: HashMap::new(),
180 handlers: Vec::new(),
181 engine_id: None,
182 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
183 max_concurrent_requests: Some(1000),
184 recv_buffer_size: Some(4 * 1024 * 1024), // 4MB
185 vacm: None,
186 cancel: None,
187 }
188 }
189
190 /// Set the UDP bind address.
191 ///
192 /// Default is `0.0.0.0:161` (standard SNMP agent port). Note that binding
193 /// to UDP port 161 typically requires root/administrator privileges.
194 ///
195 /// # IPv4 Examples
196 ///
197 /// ```rust,no_run
198 /// use async_snmp::agent::Agent;
199 ///
200 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
201 /// // Bind to all IPv4 interfaces on standard port (requires privileges)
202 /// let agent = Agent::builder().bind("0.0.0.0:161").community(b"public").build().await?;
203 ///
204 /// // Bind to localhost only on non-privileged port
205 /// let agent = Agent::builder().bind("127.0.0.1:1161").community(b"public").build().await?;
206 ///
207 /// // Bind to specific interface
208 /// let agent = Agent::builder().bind("192.168.1.100:161").community(b"public").build().await?;
209 /// # Ok(())
210 /// # }
211 /// ```
212 ///
213 /// # IPv6 / Dual-Stack Examples
214 ///
215 /// ```rust,no_run
216 /// use async_snmp::agent::Agent;
217 ///
218 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
219 /// // Bind to all interfaces via dual-stack (handles both IPv4 and IPv6)
220 /// let agent = Agent::builder().bind("[::]:161").community(b"public").build().await?;
221 ///
222 /// // Bind to IPv6 localhost only
223 /// let agent = Agent::builder().bind("[::1]:1161").community(b"public").build().await?;
224 /// # Ok(())
225 /// # }
226 /// ```
227 pub fn bind(mut self, addr: impl Into<String>) -> Self {
228 self.bind_addr = addr.into();
229 self
230 }
231
232 /// Add an accepted community string for v1/v2c requests.
233 ///
234 /// Multiple communities can be added. If none are added,
235 /// all v1/v2c requests are rejected.
236 ///
237 /// # Example
238 ///
239 /// ```rust,no_run
240 /// use async_snmp::agent::Agent;
241 ///
242 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
243 /// let agent = Agent::builder()
244 /// .bind("0.0.0.0:1161")
245 /// .community(b"public") // Read-only access
246 /// .community(b"private") // Read-write access (with VACM)
247 /// .build()
248 /// .await?;
249 /// # Ok(())
250 /// # }
251 /// ```
252 pub fn community(mut self, community: &[u8]) -> Self {
253 self.communities.push(community.to_vec());
254 self
255 }
256
257 /// Add multiple community strings.
258 ///
259 /// # Example
260 ///
261 /// ```rust,no_run
262 /// use async_snmp::agent::Agent;
263 ///
264 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
265 /// let communities = ["public", "private", "monitor"];
266 /// let agent = Agent::builder()
267 /// .bind("0.0.0.0:1161")
268 /// .communities(communities)
269 /// .build()
270 /// .await?;
271 /// # Ok(())
272 /// # }
273 /// ```
274 pub fn communities<I, C>(mut self, communities: I) -> Self
275 where
276 I: IntoIterator<Item = C>,
277 C: AsRef<[u8]>,
278 {
279 for c in communities {
280 self.communities.push(c.as_ref().to_vec());
281 }
282 self
283 }
284
285 /// Add a USM user for SNMPv3 authentication.
286 ///
287 /// Configure authentication and privacy settings using the closure.
288 /// Multiple users can be added with different security levels.
289 ///
290 /// # Security Levels
291 ///
292 /// - **noAuthNoPriv**: No authentication or encryption
293 /// - **authNoPriv**: Authentication only (HMAC verification)
294 /// - **authPriv**: Authentication and encryption
295 ///
296 /// # Example
297 ///
298 /// ```rust,no_run
299 /// use async_snmp::agent::Agent;
300 /// use async_snmp::{AuthProtocol, PrivProtocol};
301 ///
302 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
303 /// let agent = Agent::builder()
304 /// .bind("0.0.0.0:1161")
305 /// // Read-only user with authentication only
306 /// .usm_user("monitor", |u| {
307 /// u.auth(AuthProtocol::Sha256, b"monitorpass123")
308 /// })
309 /// // Admin user with full encryption
310 /// .usm_user("admin", |u| {
311 /// u.auth(AuthProtocol::Sha256, b"adminauth123")
312 /// .privacy(PrivProtocol::Aes128, b"adminpriv123")
313 /// })
314 /// .build()
315 /// .await?;
316 /// # Ok(())
317 /// # }
318 /// ```
319 pub fn usm_user<F>(mut self, username: impl Into<Bytes>, configure: F) -> Self
320 where
321 F: FnOnce(UsmUserConfig) -> UsmUserConfig,
322 {
323 let username_bytes: Bytes = username.into();
324 let config = configure(UsmUserConfig::new(username_bytes.clone()));
325 self.usm_users.insert(username_bytes, config);
326 self
327 }
328
329 /// Set the engine ID for SNMPv3.
330 ///
331 /// If not set, a default engine ID will be generated based on the
332 /// RFC 3411 format using enterprise number and timestamp.
333 ///
334 /// # Example
335 ///
336 /// ```rust,no_run
337 /// use async_snmp::agent::Agent;
338 ///
339 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
340 /// let agent = Agent::builder()
341 /// .bind("0.0.0.0:1161")
342 /// .engine_id(b"\x80\x00\x00\x00\x01MyEngine".to_vec())
343 /// .community(b"public")
344 /// .build()
345 /// .await?;
346 /// # Ok(())
347 /// # }
348 /// ```
349 pub fn engine_id(mut self, engine_id: impl Into<Vec<u8>>) -> Self {
350 self.engine_id = Some(engine_id.into());
351 self
352 }
353
354 /// Set the maximum message size for responses.
355 ///
356 /// Default is 1472 octets (fits Ethernet MTU minus IP/UDP headers).
357 /// GETBULK responses will be truncated to fit within this limit.
358 ///
359 /// For SNMPv3 requests, the agent uses the minimum of this value
360 /// and the msgMaxSize from the request.
361 pub fn max_message_size(mut self, size: usize) -> Self {
362 self.max_message_size = size;
363 self
364 }
365
366 /// Set the maximum number of concurrent requests the agent will process.
367 ///
368 /// Default is 1000. Requests beyond this limit will queue until a slot
369 /// becomes available. Set to `None` for unbounded concurrency.
370 ///
371 /// This controls memory usage under high load while still allowing
372 /// parallel request processing.
373 pub fn max_concurrent_requests(mut self, limit: Option<usize>) -> Self {
374 self.max_concurrent_requests = limit;
375 self
376 }
377
378 /// Set the UDP socket receive buffer size.
379 ///
380 /// Default is 4MB. The kernel may cap this at `net.core.rmem_max`.
381 /// A larger buffer prevents packet loss during request bursts.
382 ///
383 /// Set to `None` to use the kernel default.
384 pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
385 self.recv_buffer_size = size;
386 self
387 }
388
389 /// Register a MIB handler for an OID subtree.
390 ///
391 /// Handlers are matched by longest prefix. When a request comes in,
392 /// the handler with the longest matching prefix is used.
393 ///
394 /// # Example
395 ///
396 /// ```rust,no_run
397 /// use async_snmp::agent::Agent;
398 /// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
399 /// use async_snmp::{Oid, Value, VarBind, oid};
400 /// use std::sync::Arc;
401 ///
402 /// struct SystemHandler;
403 /// impl MibHandler for SystemHandler {
404 /// fn get<'a>(&'a self, _: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
405 /// Box::pin(async move {
406 /// if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
407 /// GetResult::Value(Value::OctetString("My Agent".into()))
408 /// } else {
409 /// GetResult::NoSuchObject
410 /// }
411 /// })
412 /// }
413 /// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
414 /// Box::pin(async { GetNextResult::EndOfMibView })
415 /// }
416 /// }
417 ///
418 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
419 /// let agent = Agent::builder()
420 /// .bind("0.0.0.0:1161")
421 /// .community(b"public")
422 /// // Register handler for system MIB subtree
423 /// .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemHandler))
424 /// .build()
425 /// .await?;
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub fn handler(mut self, prefix: Oid, handler: Arc<dyn MibHandler>) -> Self {
430 self.handlers.push(RegisteredHandler { prefix, handler });
431 self
432 }
433
434 /// Configure VACM (View-based Access Control Model) using a builder function.
435 ///
436 /// When VACM is enabled, all requests are checked against the configured
437 /// access control rules. Requests that don't have proper access are rejected
438 /// with `noAccess` error (v2c/v3) or `noSuchName` (v1).
439 ///
440 /// # Example
441 ///
442 /// ```rust,no_run
443 /// use async_snmp::agent::{Agent, SecurityModel, VacmBuilder};
444 /// use async_snmp::message::SecurityLevel;
445 /// use async_snmp::oid;
446 ///
447 /// # async fn example() -> Result<(), Box<async_snmp::Error>> {
448 /// let agent = Agent::builder()
449 /// .bind("0.0.0.0:161")
450 /// .community(b"public")
451 /// .community(b"private")
452 /// .vacm(|v| v
453 /// .group("public", SecurityModel::V2c, "readonly_group")
454 /// .group("private", SecurityModel::V2c, "readwrite_group")
455 /// .access("readonly_group", |a| a
456 /// .read_view("full_view"))
457 /// .access("readwrite_group", |a| a
458 /// .read_view("full_view")
459 /// .write_view("write_view"))
460 /// .view("full_view", |v| v
461 /// .include(oid!(1, 3, 6, 1)))
462 /// .view("write_view", |v| v
463 /// .include(oid!(1, 3, 6, 1, 2, 1, 1))))
464 /// .build()
465 /// .await?;
466 /// # Ok(())
467 /// # }
468 /// ```
469 pub fn vacm<F>(mut self, configure: F) -> Self
470 where
471 F: FnOnce(VacmBuilder) -> VacmBuilder,
472 {
473 let builder = VacmBuilder::new();
474 self.vacm = Some(configure(builder).build());
475 self
476 }
477
478 /// Set a cancellation token for graceful shutdown.
479 ///
480 /// If not set, the agent creates its own token accessible via `Agent::cancel()`.
481 pub fn cancel(mut self, token: CancellationToken) -> Self {
482 self.cancel = Some(token);
483 self
484 }
485
486 /// Build the agent.
487 pub async fn build(mut self) -> Result<Agent> {
488 let bind_addr: std::net::SocketAddr = self.bind_addr.parse().map_err(|_| {
489 Error::Config(format!("invalid bind address: {}", self.bind_addr).into())
490 })?;
491
492 let socket = bind_udp_socket(bind_addr, self.recv_buffer_size)
493 .await
494 .map_err(|e| Error::Network {
495 target: bind_addr,
496 source: e,
497 })?;
498
499 let local_addr = socket.local_addr().map_err(|e| Error::Network {
500 target: bind_addr,
501 source: e,
502 })?;
503
504 let socket_state =
505 UdpSocketState::new(UdpSockRef::from(&socket)).map_err(|e| Error::Network {
506 target: bind_addr,
507 source: e,
508 })?;
509
510 // Generate default engine ID if not provided
511 let engine_id = self.engine_id.unwrap_or_else(|| {
512 // RFC 3411 format: enterprise number + format + local identifier
513 // Use a simple format: 0x80 (local) + timestamp + random
514 let mut id = vec![0x80, 0x00, 0x00, 0x00, 0x01]; // Enterprise format indicator
515 let timestamp = std::time::SystemTime::now()
516 .duration_since(std::time::UNIX_EPOCH)
517 .unwrap_or_default()
518 .as_secs();
519 id.extend_from_slice(×tamp.to_be_bytes());
520 id
521 });
522
523 // Sort handlers by prefix length (longest first) for matching
524 self.handlers
525 .sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
526
527 let cancel = self.cancel.unwrap_or_default();
528
529 // Create concurrency limiter if configured
530 let concurrency_limit = self
531 .max_concurrent_requests
532 .map(|n| Arc::new(Semaphore::new(n)));
533
534 Ok(Agent {
535 inner: Arc::new(AgentInner {
536 socket: Arc::new(socket),
537 socket_state,
538 local_addr,
539 communities: self.communities,
540 usm_users: self.usm_users,
541 handlers: self.handlers,
542 engine_id,
543 engine_boots: AtomicU32::new(1),
544 engine_time: AtomicU32::new(0),
545 engine_start: Instant::now(),
546 salt_counter: SaltCounter::new(),
547 max_message_size: self.max_message_size,
548 concurrency_limit,
549 vacm: self.vacm,
550 snmp_invalid_msgs: AtomicU32::new(0),
551 snmp_unknown_security_models: AtomicU32::new(0),
552 snmp_silent_drops: AtomicU32::new(0),
553 cancel,
554 }),
555 })
556 }
557}
558
559impl Default for AgentBuilder {
560 fn default() -> Self {
561 Self::new()
562 }
563}
564
565/// Inner state shared across agent clones.
566pub(crate) struct AgentInner {
567 pub(crate) socket: Arc<UdpSocket>,
568 pub(crate) socket_state: UdpSocketState,
569 pub(crate) local_addr: SocketAddr,
570 pub(crate) communities: Vec<Vec<u8>>,
571 pub(crate) usm_users: HashMap<Bytes, UsmUserConfig>,
572 pub(crate) handlers: Vec<RegisteredHandler>,
573 pub(crate) engine_id: Vec<u8>,
574 pub(crate) engine_boots: AtomicU32,
575 pub(crate) engine_time: AtomicU32,
576 pub(crate) engine_start: Instant,
577 pub(crate) salt_counter: SaltCounter,
578 pub(crate) max_message_size: usize,
579 pub(crate) concurrency_limit: Option<Arc<Semaphore>>,
580 pub(crate) vacm: Option<VacmConfig>,
581 // RFC 3412 statistics counters
582 /// snmpInvalidMsgs (1.3.6.1.6.3.11.2.1.2) - messages with invalid msgFlags
583 /// (e.g., privacy without authentication)
584 pub(crate) snmp_invalid_msgs: AtomicU32,
585 /// snmpUnknownSecurityModels (1.3.6.1.6.3.11.2.1.1) - messages with
586 /// unrecognized security model
587 pub(crate) snmp_unknown_security_models: AtomicU32,
588 /// snmpSilentDrops (1.3.6.1.6.3.11.2.1.3) - confirmed-class PDUs silently
589 /// dropped because even an empty response would exceed max message size
590 pub(crate) snmp_silent_drops: AtomicU32,
591 /// Cancellation token for graceful shutdown.
592 pub(crate) cancel: CancellationToken,
593}
594
595/// SNMP Agent.
596///
597/// Listens for and responds to SNMP requests (GET, GETNEXT, GETBULK, SET).
598///
599/// # Example
600///
601/// ```rust,no_run
602/// use async_snmp::agent::Agent;
603/// use async_snmp::oid;
604///
605/// # async fn example() -> Result<(), Box<async_snmp::Error>> {
606/// let agent = Agent::builder()
607/// .bind("0.0.0.0:161")
608/// .community(b"public")
609/// .build()
610/// .await?;
611///
612/// agent.run().await
613/// # }
614/// ```
615pub struct Agent {
616 pub(crate) inner: Arc<AgentInner>,
617}
618
619impl Agent {
620 /// Create a builder for configuring the agent.
621 pub fn builder() -> AgentBuilder {
622 AgentBuilder::new()
623 }
624
625 /// Get the local address the agent is bound to.
626 pub fn local_addr(&self) -> SocketAddr {
627 self.inner.local_addr
628 }
629
630 /// Get the engine ID.
631 pub fn engine_id(&self) -> &[u8] {
632 &self.inner.engine_id
633 }
634
635 /// Get the cancellation token for this agent.
636 ///
637 /// Call `token.cancel()` to initiate graceful shutdown.
638 pub fn cancel(&self) -> CancellationToken {
639 self.inner.cancel.clone()
640 }
641
642 /// Get the snmpInvalidMsgs counter value.
643 ///
644 /// This counter tracks messages with invalid msgFlags, such as
645 /// privacy-without-authentication (RFC 3412 Section 7.2 Step 5d).
646 ///
647 /// OID: 1.3.6.1.6.3.11.2.1.2
648 pub fn snmp_invalid_msgs(&self) -> u32 {
649 self.inner.snmp_invalid_msgs.load(Ordering::Relaxed)
650 }
651
652 /// Get the snmpUnknownSecurityModels counter value.
653 ///
654 /// This counter tracks messages with unrecognized security models
655 /// (RFC 3412 Section 7.2 Step 2).
656 ///
657 /// OID: 1.3.6.1.6.3.11.2.1.1
658 pub fn snmp_unknown_security_models(&self) -> u32 {
659 self.inner
660 .snmp_unknown_security_models
661 .load(Ordering::Relaxed)
662 }
663
664 /// Get the snmpSilentDrops counter value.
665 ///
666 /// This counter tracks confirmed-class PDUs (GetRequest, GetNextRequest,
667 /// GetBulkRequest, SetRequest, InformRequest) that were silently dropped
668 /// because even an empty Response-PDU would exceed the maximum message
669 /// size constraint (RFC 3412 Section 7.1).
670 ///
671 /// OID: 1.3.6.1.6.3.11.2.1.3
672 pub fn snmp_silent_drops(&self) -> u32 {
673 self.inner.snmp_silent_drops.load(Ordering::Relaxed)
674 }
675
676 /// Run the agent, processing requests concurrently.
677 ///
678 /// Requests are processed in parallel up to the configured
679 /// `max_concurrent_requests` limit (default: 1000). This method runs
680 /// until the cancellation token is triggered.
681 #[instrument(skip(self), err, fields(snmp.local_addr = %self.local_addr()))]
682 pub async fn run(&self) -> Result<()> {
683 let mut buf = vec![0u8; 65535];
684
685 loop {
686 let recv_meta = tokio::select! {
687 result = self.recv_packet(&mut buf) => {
688 result?
689 }
690 _ = self.inner.cancel.cancelled() => {
691 tracing::info!(target: "async_snmp::agent", "agent shutdown requested");
692 return Ok(());
693 }
694 };
695
696 let data = Bytes::copy_from_slice(&buf[..recv_meta.len]);
697 let agent = self.clone();
698
699 let permit = if let Some(ref sem) = self.inner.concurrency_limit {
700 Some(sem.clone().acquire_owned().await.expect("semaphore closed"))
701 } else {
702 None
703 };
704
705 tokio::spawn(async move {
706 agent.update_engine_time();
707
708 match agent.handle_request(data, recv_meta.addr).await {
709 Ok(Some(response_bytes)) => {
710 if let Err(e) = agent.send_response(&response_bytes, &recv_meta).await {
711 tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "failed to send response");
712 }
713 }
714 Ok(None) => {}
715 Err(e) => {
716 tracing::warn!(target: "async_snmp::agent", { snmp.source = %recv_meta.addr, error = %e }, "error handling request");
717 }
718 }
719
720 drop(permit);
721 });
722 }
723 }
724
725 async fn recv_packet(&self, buf: &mut [u8]) -> Result<RecvMeta> {
726 let mut iov = [IoSliceMut::new(buf)];
727 let mut meta = [RecvMeta::default()];
728
729 loop {
730 self.inner
731 .socket
732 .readable()
733 .await
734 .map_err(|e| Error::Network {
735 target: self.inner.local_addr,
736 source: e,
737 })?;
738
739 let result = self.inner.socket.try_io(tokio::io::Interest::READABLE, || {
740 let sref = UdpSockRef::from(&*self.inner.socket);
741 self.inner.socket_state.recv(sref, &mut iov, &mut meta)
742 });
743
744 match result {
745 Ok(n) if n > 0 => return Ok(meta[0]),
746 Ok(_) => continue,
747 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
748 Err(e) => {
749 return Err(Error::Network {
750 target: self.inner.local_addr,
751 source: e,
752 }
753 .boxed());
754 }
755 }
756 }
757 }
758
759 async fn send_response(&self, data: &[u8], recv_meta: &RecvMeta) -> std::io::Result<()> {
760 let transmit = Transmit {
761 destination: recv_meta.addr,
762 ecn: None,
763 contents: data,
764 segment_size: None,
765 src_ip: recv_meta.dst_ip,
766 };
767
768 loop {
769 self.inner.socket.writable().await?;
770
771 let result = self.inner.socket.try_io(tokio::io::Interest::WRITABLE, || {
772 let sref = UdpSockRef::from(&*self.inner.socket);
773 self.inner.socket_state.try_send(sref, &transmit)
774 });
775
776 match result {
777 Ok(()) => return Ok(()),
778 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
779 Err(e) => return Err(e),
780 }
781 }
782 }
783
784 /// Process a single request and return the response bytes.
785 ///
786 /// Returns `None` if no response should be sent.
787 async fn handle_request(&self, data: Bytes, source: SocketAddr) -> Result<Option<Bytes>> {
788 // Peek at version
789 let mut decoder = Decoder::with_target(data.clone(), source);
790 let mut seq = decoder.read_sequence()?;
791 let version_num = seq.read_integer()?;
792 let version = Version::from_i32(version_num).ok_or_else(|| {
793 tracing::debug!(target: "async_snmp::agent", { source = %source, kind = %DecodeErrorKind::UnknownVersion(version_num) }, "unknown SNMP version");
794 Error::MalformedResponse { target: source }.boxed()
795 })?;
796 drop(seq);
797 drop(decoder);
798
799 match version {
800 Version::V1 => self.handle_v1(data, source).await,
801 Version::V2c => self.handle_v2c(data, source).await,
802 Version::V3 => self.handle_v3(data, source).await,
803 }
804 }
805
806 /// Update engine time based on elapsed time since start.
807 fn update_engine_time(&self) {
808 let elapsed = self.inner.engine_start.elapsed().as_secs() as u32;
809 self.inner.engine_time.store(elapsed, Ordering::Relaxed);
810 }
811
812 /// Validate community string using constant-time comparison.
813 ///
814 /// Uses constant-time comparison to prevent timing attacks that could
815 /// be used to guess valid community strings character by character.
816 pub(crate) fn validate_community(&self, community: &[u8]) -> bool {
817 if self.inner.communities.is_empty() {
818 // No communities configured = reject all
819 return false;
820 }
821 // Use constant-time comparison for each community string.
822 // We compare against all configured communities regardless of
823 // early matches to maintain constant-time behavior.
824 let mut valid = false;
825 for configured in &self.inner.communities {
826 // ct_eq returns a Choice, which we convert to bool after comparison
827 if configured.len() == community.len()
828 && bool::from(configured.as_slice().ct_eq(community))
829 {
830 valid = true;
831 }
832 }
833 valid
834 }
835
836 /// Dispatch a request to the appropriate handler.
837 async fn dispatch_request(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
838 match pdu.pdu_type {
839 PduType::GetRequest => self.handle_get(ctx, pdu).await,
840 PduType::GetNextRequest => self.handle_get_next(ctx, pdu).await,
841 PduType::GetBulkRequest => self.handle_get_bulk(ctx, pdu).await,
842 PduType::SetRequest => self.handle_set(ctx, pdu).await,
843 PduType::InformRequest => self.handle_inform(pdu),
844 _ => {
845 // Should not happen - filtered earlier
846 Ok(pdu.to_error_response(ErrorStatus::GenErr, 0))
847 }
848 }
849 }
850
851 /// Handle InformRequest PDU.
852 ///
853 /// Per RFC 3416 Section 4.2.7, an InformRequest is a confirmed-class PDU
854 /// that the receiver acknowledges by returning a Response with the same
855 /// request-id and varbind list.
856 fn handle_inform(&self, pdu: &Pdu) -> Result<Pdu> {
857 // Simply acknowledge by returning the same varbinds in a Response
858 Ok(Pdu {
859 pdu_type: PduType::Response,
860 request_id: pdu.request_id,
861 error_status: 0,
862 error_index: 0,
863 varbinds: pdu.varbinds.clone(),
864 })
865 }
866
867 /// Handle GET request.
868 async fn handle_get(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
869 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
870
871 for (index, vb) in pdu.varbinds.iter().enumerate() {
872 // VACM read access check
873 if let Some(ref vacm) = self.inner.vacm
874 && !vacm.check_access(ctx.read_view.as_ref(), &vb.oid)
875 {
876 // v1: noSuchName, v2c/v3: noAccess or NoSuchObject
877 if ctx.version == Version::V1 {
878 return Ok(Pdu {
879 pdu_type: PduType::Response,
880 request_id: pdu.request_id,
881 error_status: ErrorStatus::NoSuchName.as_i32(),
882 error_index: (index + 1) as i32,
883 varbinds: pdu.varbinds.clone(),
884 });
885 } else {
886 // For GET, return NoSuchObject for inaccessible OIDs per RFC 3415
887 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::NoSuchObject));
888 continue;
889 }
890 }
891
892 let result = if let Some(handler) = self.find_handler(&vb.oid) {
893 handler.handler.get(ctx, &vb.oid).await
894 } else {
895 GetResult::NoSuchObject
896 };
897
898 let response_value = match result {
899 GetResult::Value(v) => v,
900 GetResult::NoSuchObject => {
901 // v1 returns noSuchName error, v2c/v3 returns NoSuchObject exception
902 if ctx.version == Version::V1 {
903 return Ok(Pdu {
904 pdu_type: PduType::Response,
905 request_id: pdu.request_id,
906 error_status: ErrorStatus::NoSuchName.as_i32(),
907 error_index: (index + 1) as i32,
908 varbinds: pdu.varbinds.clone(),
909 });
910 } else {
911 Value::NoSuchObject
912 }
913 }
914 GetResult::NoSuchInstance => {
915 // v1 returns noSuchName error, v2c/v3 returns NoSuchInstance exception
916 if ctx.version == Version::V1 {
917 return Ok(Pdu {
918 pdu_type: PduType::Response,
919 request_id: pdu.request_id,
920 error_status: ErrorStatus::NoSuchName.as_i32(),
921 error_index: (index + 1) as i32,
922 varbinds: pdu.varbinds.clone(),
923 });
924 } else {
925 Value::NoSuchInstance
926 }
927 }
928 };
929
930 response_varbinds.push(VarBind::new(vb.oid.clone(), response_value));
931 }
932
933 Ok(Pdu {
934 pdu_type: PduType::Response,
935 request_id: pdu.request_id,
936 error_status: 0,
937 error_index: 0,
938 varbinds: response_varbinds,
939 })
940 }
941
942 /// Handle GETNEXT request.
943 async fn handle_get_next(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
944 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
945
946 for (index, vb) in pdu.varbinds.iter().enumerate() {
947 // Try to find the next OID from any handler
948 let next = self.get_next_oid(ctx, &vb.oid).await;
949
950 // Check VACM access for the returned OID (if VACM enabled)
951 let next = if let Some(ref next_vb) = next {
952 if let Some(ref vacm) = self.inner.vacm {
953 if vacm.check_access(ctx.read_view.as_ref(), &next_vb.oid) {
954 next
955 } else {
956 // OID not accessible, continue searching
957 // For simplicity, we just return EndOfMibView here
958 // A more complete implementation would continue the search
959 None
960 }
961 } else {
962 next
963 }
964 } else {
965 next
966 };
967
968 match next {
969 Some(next_vb) => {
970 response_varbinds.push(next_vb);
971 }
972 None => {
973 // v1 returns noSuchName, v2c/v3 returns endOfMibView
974 if ctx.version == Version::V1 {
975 return Ok(Pdu {
976 pdu_type: PduType::Response,
977 request_id: pdu.request_id,
978 error_status: ErrorStatus::NoSuchName.as_i32(),
979 error_index: (index + 1) as i32,
980 varbinds: pdu.varbinds.clone(),
981 });
982 } else {
983 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::EndOfMibView));
984 }
985 }
986 }
987 }
988
989 Ok(Pdu {
990 pdu_type: PduType::Response,
991 request_id: pdu.request_id,
992 error_status: 0,
993 error_index: 0,
994 varbinds: response_varbinds,
995 })
996 }
997
998 /// Handle GETBULK request.
999 ///
1000 /// Per RFC 3416 Section 4.2.3, if the response would exceed the message
1001 /// size limit, we return fewer variable bindings rather than all of them.
1002 async fn handle_get_bulk(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
1003 // For GETBULK, error_status is non_repeaters and error_index is max_repetitions
1004 let non_repeaters = pdu.error_status.max(0) as usize;
1005 let max_repetitions = pdu.error_index.max(0) as usize;
1006
1007 let mut response_varbinds = Vec::new();
1008 let mut current_size: usize = RESPONSE_OVERHEAD;
1009 let max_size = self.inner.max_message_size;
1010
1011 // Helper to check if we can add a varbind
1012 let can_add = |vb: &VarBind, current_size: usize| -> bool {
1013 current_size + vb.encoded_size() <= max_size
1014 };
1015
1016 // Handle non-repeaters (first N varbinds get one GETNEXT each)
1017 for vb in pdu.varbinds.iter().take(non_repeaters) {
1018 let next_vb = match self.get_next_oid(ctx, &vb.oid).await {
1019 Some(next_vb) => next_vb,
1020 None => VarBind::new(vb.oid.clone(), Value::EndOfMibView),
1021 };
1022
1023 if !can_add(&next_vb, current_size) {
1024 // Can't fit even non-repeaters, return tooBig if we have nothing
1025 if response_varbinds.is_empty() {
1026 return Ok(Pdu {
1027 pdu_type: PduType::Response,
1028 request_id: pdu.request_id,
1029 error_status: ErrorStatus::TooBig.as_i32(),
1030 error_index: 0,
1031 varbinds: pdu.varbinds.clone(),
1032 });
1033 }
1034 // Otherwise return what we have
1035 break;
1036 }
1037
1038 current_size += next_vb.encoded_size();
1039 response_varbinds.push(next_vb);
1040 }
1041
1042 // Handle repeaters
1043 if non_repeaters < pdu.varbinds.len() {
1044 let repeaters = &pdu.varbinds[non_repeaters..];
1045 let mut current_oids: Vec<Oid> = repeaters.iter().map(|vb| vb.oid.clone()).collect();
1046 let mut all_done = vec![false; repeaters.len()];
1047
1048 'outer: for _ in 0..max_repetitions {
1049 let mut row_complete = true;
1050 for (i, oid) in current_oids.iter_mut().enumerate() {
1051 let next_vb = if all_done[i] {
1052 VarBind::new(oid.clone(), Value::EndOfMibView)
1053 } else {
1054 match self.get_next_oid(ctx, oid).await {
1055 Some(next_vb) => {
1056 *oid = next_vb.oid.clone();
1057 row_complete = false;
1058 next_vb
1059 }
1060 None => {
1061 all_done[i] = true;
1062 VarBind::new(oid.clone(), Value::EndOfMibView)
1063 }
1064 }
1065 };
1066
1067 // Check size before adding
1068 if !can_add(&next_vb, current_size) {
1069 // Can't fit more, return what we have
1070 break 'outer;
1071 }
1072
1073 current_size += next_vb.encoded_size();
1074 response_varbinds.push(next_vb);
1075 }
1076
1077 if row_complete {
1078 break;
1079 }
1080 }
1081 }
1082
1083 Ok(Pdu {
1084 pdu_type: PduType::Response,
1085 request_id: pdu.request_id,
1086 error_status: 0,
1087 error_index: 0,
1088 varbinds: response_varbinds,
1089 })
1090 }
1091
1092 /// Find the handler for a given OID.
1093 pub(crate) fn find_handler(&self, oid: &Oid) -> Option<&RegisteredHandler> {
1094 // Handlers are sorted by prefix length (longest first)
1095 self.inner
1096 .handlers
1097 .iter()
1098 .find(|&handler| handler.handler.handles(&handler.prefix, oid))
1099 .map(|v| v as _)
1100 }
1101
1102 /// Get the next OID from any handler.
1103 async fn get_next_oid(&self, ctx: &RequestContext, oid: &Oid) -> Option<VarBind> {
1104 // Find the first handler that can provide a next OID
1105 let mut best_result: Option<VarBind> = None;
1106
1107 for handler in &self.inner.handlers {
1108 if let GetNextResult::Value(next) = handler.handler.get_next(ctx, oid).await {
1109 // Must be lexicographically greater than the request OID
1110 if next.oid > *oid {
1111 match &best_result {
1112 None => best_result = Some(next),
1113 Some(current) if next.oid < current.oid => best_result = Some(next),
1114 _ => {}
1115 }
1116 }
1117 }
1118 }
1119
1120 best_result
1121 }
1122}
1123
1124impl Clone for Agent {
1125 fn clone(&self) -> Self {
1126 Self {
1127 inner: Arc::clone(&self.inner),
1128 }
1129 }
1130}
1131
1132#[cfg(test)]
1133mod tests {
1134 use super::*;
1135 use crate::handler::{
1136 BoxFuture, GetNextResult, GetResult, MibHandler, RequestContext, SecurityModel, SetResult,
1137 };
1138 use crate::message::SecurityLevel;
1139 use crate::oid;
1140
1141 struct TestHandler;
1142
1143 impl MibHandler for TestHandler {
1144 fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
1145 Box::pin(async move {
1146 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0) {
1147 return GetResult::Value(Value::Integer(42));
1148 }
1149 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0) {
1150 return GetResult::Value(Value::OctetString(Bytes::from_static(b"test")));
1151 }
1152 GetResult::NoSuchObject
1153 })
1154 }
1155
1156 fn get_next<'a>(
1157 &'a self,
1158 _ctx: &'a RequestContext,
1159 oid: &'a Oid,
1160 ) -> BoxFuture<'a, GetNextResult> {
1161 Box::pin(async move {
1162 let oid1 = oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0);
1163 let oid2 = oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0);
1164
1165 if oid < &oid1 {
1166 return GetNextResult::Value(VarBind::new(oid1, Value::Integer(42)));
1167 }
1168 if oid < &oid2 {
1169 return GetNextResult::Value(VarBind::new(
1170 oid2,
1171 Value::OctetString(Bytes::from_static(b"test")),
1172 ));
1173 }
1174 GetNextResult::EndOfMibView
1175 })
1176 }
1177 }
1178
1179 fn test_ctx() -> RequestContext {
1180 RequestContext {
1181 source: "127.0.0.1:12345".parse().unwrap(),
1182 version: Version::V2c,
1183 security_model: SecurityModel::V2c,
1184 security_name: Bytes::from_static(b"public"),
1185 security_level: SecurityLevel::NoAuthNoPriv,
1186 context_name: Bytes::new(),
1187 request_id: 1,
1188 pdu_type: PduType::GetRequest,
1189 group_name: None,
1190 read_view: None,
1191 write_view: None,
1192 }
1193 }
1194
1195 #[test]
1196 fn test_agent_builder_defaults() {
1197 let builder = AgentBuilder::new();
1198 assert_eq!(builder.bind_addr, "0.0.0.0:161");
1199 assert!(builder.communities.is_empty());
1200 assert!(builder.usm_users.is_empty());
1201 assert!(builder.handlers.is_empty());
1202 }
1203
1204 #[test]
1205 fn test_agent_builder_community() {
1206 let builder = AgentBuilder::new()
1207 .community(b"public")
1208 .community(b"private");
1209 assert_eq!(builder.communities.len(), 2);
1210 }
1211
1212 #[test]
1213 fn test_agent_builder_communities() {
1214 let builder = AgentBuilder::new().communities(["public", "private"]);
1215 assert_eq!(builder.communities.len(), 2);
1216 }
1217
1218 #[test]
1219 fn test_agent_builder_handler() {
1220 let builder =
1221 AgentBuilder::new().handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(TestHandler));
1222 assert_eq!(builder.handlers.len(), 1);
1223 }
1224
1225 #[tokio::test]
1226 async fn test_mib_handler_default_set() {
1227 let handler = TestHandler;
1228 let mut ctx = test_ctx();
1229 ctx.pdu_type = PduType::SetRequest;
1230
1231 let result = handler
1232 .test_set(&ctx, &oid!(1, 3, 6, 1), &Value::Integer(1))
1233 .await;
1234 assert_eq!(result, SetResult::NotWritable);
1235 }
1236
1237 #[test]
1238 fn test_mib_handler_handles() {
1239 let handler = TestHandler;
1240 let prefix = oid!(1, 3, 6, 1, 4, 1, 99999);
1241
1242 // OID within prefix
1243 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0)));
1244
1245 // OID before prefix (GETNEXT should still try)
1246 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99998)));
1247
1248 // OID after prefix (not handled)
1249 assert!(!handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 100000)));
1250 }
1251
1252 #[tokio::test]
1253 async fn test_test_handler_get() {
1254 let handler = TestHandler;
1255 let ctx = test_ctx();
1256
1257 // Existing OID
1258 let result = handler
1259 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1260 .await;
1261 assert!(matches!(result, GetResult::Value(Value::Integer(42))));
1262
1263 // Non-existing OID
1264 let result = handler
1265 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 99, 0))
1266 .await;
1267 assert!(matches!(result, GetResult::NoSuchObject));
1268 }
1269
1270 #[tokio::test]
1271 async fn test_test_handler_get_next() {
1272 let handler = TestHandler;
1273 let mut ctx = test_ctx();
1274 ctx.pdu_type = PduType::GetNextRequest;
1275
1276 // Before first OID
1277 let next = handler.get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999)).await;
1278 assert!(next.is_value());
1279 if let GetNextResult::Value(vb) = next {
1280 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0));
1281 }
1282
1283 // Between OIDs
1284 let next = handler
1285 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1286 .await;
1287 assert!(next.is_value());
1288 if let GetNextResult::Value(vb) = next {
1289 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0));
1290 }
1291
1292 // After last OID
1293 let next = handler
1294 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0))
1295 .await;
1296 assert!(next.is_end_of_mib_view());
1297 }
1298}