async_snmp/agent/mod.rs
1//! SNMP Agent (RFC 3413).
2//!
3//! This module provides SNMP agent functionality for responding to
4//! GET, GETNEXT, GETBULK, and SET requests.
5//!
6//! # Features
7//!
8//! - **Async handlers**: All handler methods are async for database queries, network calls, etc.
9//! - **Atomic SET**: Two-phase commit protocol (test/commit/undo) per RFC 3416
10//! - **VACM support**: Optional View-based Access Control Model (RFC 3415)
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! use async_snmp::agent::Agent;
16//! use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
17//! use async_snmp::{Oid, Value, VarBind, oid};
18//! use std::sync::Arc;
19//!
20//! // Define a simple handler for the system MIB subtree
21//! struct SystemMibHandler;
22//!
23//! impl MibHandler for SystemMibHandler {
24//! fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
25//! Box::pin(async move {
26//! // sysDescr.0
27//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
28//! return GetResult::Value(Value::OctetString("My SNMP Agent".into()));
29//! }
30//! // sysObjectID.0
31//! if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 2, 0) {
32//! return GetResult::Value(Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999)));
33//! }
34//! GetResult::NoSuchObject
35//! })
36//! }
37//!
38//! fn get_next<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetNextResult> {
39//! Box::pin(async move {
40//! // Return the lexicographically next OID after the given one
41//! let sys_descr = oid!(1, 3, 6, 1, 2, 1, 1, 1, 0);
42//! let sys_object_id = oid!(1, 3, 6, 1, 2, 1, 1, 2, 0);
43//!
44//! if oid < &sys_descr {
45//! return GetNextResult::Value(VarBind::new(sys_descr, Value::OctetString("My SNMP Agent".into())));
46//! }
47//! if oid < &sys_object_id {
48//! return GetNextResult::Value(VarBind::new(sys_object_id, Value::ObjectIdentifier(oid!(1, 3, 6, 1, 4, 1, 99999))));
49//! }
50//! GetNextResult::EndOfMibView
51//! })
52//! }
53//! }
54//!
55//! #[tokio::main]
56//! async fn main() -> Result<(), async_snmp::Error> {
57//! let agent = Agent::builder()
58//! .bind("0.0.0.0:161")
59//! .community(b"public")
60//! .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemMibHandler))
61//! .build()
62//! .await?;
63//!
64//! agent.run().await
65//! }
66//! ```
67
68mod request;
69mod response;
70mod set_handler;
71pub mod vacm;
72
73pub use vacm::{SecurityModel, VacmBuilder, VacmConfig, View, ViewCheckResult, ViewSubtree};
74
75use std::collections::HashMap;
76use std::net::SocketAddr;
77use std::sync::Arc;
78use std::sync::atomic::{AtomicU32, Ordering};
79use std::time::Instant;
80
81use bytes::Bytes;
82use subtle::ConstantTimeEq;
83use tokio::net::UdpSocket;
84use tokio::sync::Semaphore;
85use tokio_util::sync::CancellationToken;
86use tracing::instrument;
87
88use std::io::IoSliceMut;
89
90use quinn_udp::{RecvMeta, Transmit, UdpSockRef, UdpSocketState};
91
92use crate::ber::Decoder;
93use crate::error::{DecodeErrorKind, Error, ErrorStatus, Result};
94use crate::handler::{GetNextResult, GetResult, MibHandler, RequestContext};
95use crate::notification::UsmUserConfig;
96use crate::oid::Oid;
97use crate::pdu::{Pdu, PduType};
98use crate::util::bind_udp_socket;
99use crate::v3::SaltCounter;
100use crate::value::Value;
101use crate::varbind::VarBind;
102use crate::version::Version;
103
104/// Default maximum message size for UDP (RFC 3417 recommendation).
105const DEFAULT_MAX_MESSAGE_SIZE: usize = 1472;
106
107/// Overhead for SNMP message encoding (approximate conservative estimate).
108/// This accounts for version, community/USM, PDU headers, etc.
109const RESPONSE_OVERHEAD: usize = 100;
110
111/// Registered handler with its OID prefix.
112pub(crate) struct RegisteredHandler {
113 pub(crate) prefix: Oid,
114 pub(crate) handler: Arc<dyn MibHandler>,
115}
116
117/// Builder for [`Agent`].
118///
119/// Use this builder to configure and construct an SNMP agent. The builder
120/// pattern allows you to chain configuration methods before calling
121/// [`build()`](AgentBuilder::build) to create the agent.
122///
123/// # Minimal Example
124///
125/// ```rust,no_run
126/// use async_snmp::agent::Agent;
127/// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
128/// use async_snmp::{Oid, Value, VarBind, oid};
129/// use std::sync::Arc;
130///
131/// struct MyHandler;
132/// impl MibHandler for MyHandler {
133/// fn get<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetResult> {
134/// Box::pin(async { GetResult::NoSuchObject })
135/// }
136/// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
137/// Box::pin(async { GetNextResult::EndOfMibView })
138/// }
139/// }
140///
141/// # async fn example() -> Result<(), async_snmp::Error> {
142/// let agent = Agent::builder()
143/// .bind("0.0.0.0:1161") // Use non-privileged port
144/// .community(b"public")
145/// .handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(MyHandler))
146/// .build()
147/// .await?;
148/// # Ok(())
149/// # }
150/// ```
151pub struct AgentBuilder {
152 bind_addr: String,
153 communities: Vec<Vec<u8>>,
154 usm_users: HashMap<Bytes, UsmUserConfig>,
155 handlers: Vec<RegisteredHandler>,
156 engine_id: Option<Vec<u8>>,
157 max_message_size: usize,
158 max_concurrent_requests: Option<usize>,
159 recv_buffer_size: Option<usize>,
160 vacm: Option<VacmConfig>,
161 cancel: Option<CancellationToken>,
162}
163
164impl AgentBuilder {
165 /// Create a new builder with default settings.
166 ///
167 /// Defaults:
168 /// - Bind address: `0.0.0.0:161` (UDP)
169 /// - Max message size: 1472 bytes (Ethernet MTU - IP/UDP headers)
170 /// - Max concurrent requests: 1000
171 /// - Receive buffer size: 4MB (requested from kernel)
172 /// - No communities or USM users (all requests rejected)
173 /// - No handlers registered
174 pub fn new() -> Self {
175 Self {
176 bind_addr: "0.0.0.0:161".to_string(),
177 communities: Vec::new(),
178 usm_users: HashMap::new(),
179 handlers: Vec::new(),
180 engine_id: None,
181 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
182 max_concurrent_requests: Some(1000),
183 recv_buffer_size: Some(4 * 1024 * 1024), // 4MB
184 vacm: None,
185 cancel: None,
186 }
187 }
188
189 /// Set the UDP bind address.
190 ///
191 /// Default is `0.0.0.0:161` (standard SNMP agent port). Note that binding
192 /// to UDP port 161 typically requires root/administrator privileges.
193 ///
194 /// # IPv4 Examples
195 ///
196 /// ```rust,no_run
197 /// use async_snmp::agent::Agent;
198 ///
199 /// # async fn example() -> Result<(), async_snmp::Error> {
200 /// // Bind to all IPv4 interfaces on standard port (requires privileges)
201 /// let agent = Agent::builder().bind("0.0.0.0:161").community(b"public").build().await?;
202 ///
203 /// // Bind to localhost only on non-privileged port
204 /// let agent = Agent::builder().bind("127.0.0.1:1161").community(b"public").build().await?;
205 ///
206 /// // Bind to specific interface
207 /// let agent = Agent::builder().bind("192.168.1.100:161").community(b"public").build().await?;
208 /// # Ok(())
209 /// # }
210 /// ```
211 ///
212 /// # IPv6 / Dual-Stack Examples
213 ///
214 /// ```rust,no_run
215 /// use async_snmp::agent::Agent;
216 ///
217 /// # async fn example() -> Result<(), async_snmp::Error> {
218 /// // Bind to all interfaces via dual-stack (handles both IPv4 and IPv6)
219 /// let agent = Agent::builder().bind("[::]:161").community(b"public").build().await?;
220 ///
221 /// // Bind to IPv6 localhost only
222 /// let agent = Agent::builder().bind("[::1]:1161").community(b"public").build().await?;
223 /// # Ok(())
224 /// # }
225 /// ```
226 pub fn bind(mut self, addr: impl Into<String>) -> Self {
227 self.bind_addr = addr.into();
228 self
229 }
230
231 /// Add an accepted community string for v1/v2c requests.
232 ///
233 /// Multiple communities can be added. If none are added,
234 /// all v1/v2c requests are rejected.
235 ///
236 /// # Example
237 ///
238 /// ```rust,no_run
239 /// use async_snmp::agent::Agent;
240 ///
241 /// # async fn example() -> Result<(), async_snmp::Error> {
242 /// let agent = Agent::builder()
243 /// .bind("0.0.0.0:1161")
244 /// .community(b"public") // Read-only access
245 /// .community(b"private") // Read-write access (with VACM)
246 /// .build()
247 /// .await?;
248 /// # Ok(())
249 /// # }
250 /// ```
251 pub fn community(mut self, community: &[u8]) -> Self {
252 self.communities.push(community.to_vec());
253 self
254 }
255
256 /// Add multiple community strings.
257 ///
258 /// # Example
259 ///
260 /// ```rust,no_run
261 /// use async_snmp::agent::Agent;
262 ///
263 /// # async fn example() -> Result<(), async_snmp::Error> {
264 /// let communities = ["public", "private", "monitor"];
265 /// let agent = Agent::builder()
266 /// .bind("0.0.0.0:1161")
267 /// .communities(communities)
268 /// .build()
269 /// .await?;
270 /// # Ok(())
271 /// # }
272 /// ```
273 pub fn communities<I, C>(mut self, communities: I) -> Self
274 where
275 I: IntoIterator<Item = C>,
276 C: AsRef<[u8]>,
277 {
278 for c in communities {
279 self.communities.push(c.as_ref().to_vec());
280 }
281 self
282 }
283
284 /// Add a USM user for SNMPv3 authentication.
285 ///
286 /// Configure authentication and privacy settings using the closure.
287 /// Multiple users can be added with different security levels.
288 ///
289 /// # Security Levels
290 ///
291 /// - **noAuthNoPriv**: No authentication or encryption (not recommended)
292 /// - **authNoPriv**: Authentication only (HMAC verification)
293 /// - **authPriv**: Authentication and encryption (most secure)
294 ///
295 /// # Example
296 ///
297 /// ```rust,no_run
298 /// use async_snmp::agent::Agent;
299 /// use async_snmp::{AuthProtocol, PrivProtocol};
300 ///
301 /// # async fn example() -> Result<(), async_snmp::Error> {
302 /// let agent = Agent::builder()
303 /// .bind("0.0.0.0:1161")
304 /// // Read-only user with authentication only
305 /// .usm_user("monitor", |u| {
306 /// u.auth(AuthProtocol::Sha256, b"monitorpass123")
307 /// })
308 /// // Admin user with full encryption
309 /// .usm_user("admin", |u| {
310 /// u.auth(AuthProtocol::Sha256, b"adminauth123")
311 /// .privacy(PrivProtocol::Aes128, b"adminpriv123")
312 /// })
313 /// .build()
314 /// .await?;
315 /// # Ok(())
316 /// # }
317 /// ```
318 pub fn usm_user<F>(mut self, username: impl Into<Bytes>, configure: F) -> Self
319 where
320 F: FnOnce(UsmUserConfig) -> UsmUserConfig,
321 {
322 let username_bytes: Bytes = username.into();
323 let config = configure(UsmUserConfig::new(username_bytes.clone()));
324 self.usm_users.insert(username_bytes, config);
325 self
326 }
327
328 /// Set the engine ID for SNMPv3.
329 ///
330 /// If not set, a default engine ID will be generated based on the
331 /// RFC 3411 format using enterprise number and timestamp.
332 ///
333 /// # Example
334 ///
335 /// ```rust,no_run
336 /// use async_snmp::agent::Agent;
337 ///
338 /// # async fn example() -> Result<(), async_snmp::Error> {
339 /// let agent = Agent::builder()
340 /// .bind("0.0.0.0:1161")
341 /// .engine_id(b"\x80\x00\x00\x00\x01MyEngine".to_vec())
342 /// .community(b"public")
343 /// .build()
344 /// .await?;
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub fn engine_id(mut self, engine_id: impl Into<Vec<u8>>) -> Self {
349 self.engine_id = Some(engine_id.into());
350 self
351 }
352
353 /// Set the maximum message size for responses.
354 ///
355 /// Default is 1472 octets (fits Ethernet MTU minus IP/UDP headers).
356 /// GETBULK responses will be truncated to fit within this limit.
357 ///
358 /// For SNMPv3 requests, the agent uses the minimum of this value
359 /// and the msgMaxSize from the request.
360 pub fn max_message_size(mut self, size: usize) -> Self {
361 self.max_message_size = size;
362 self
363 }
364
365 /// Set the maximum number of concurrent requests the agent will process.
366 ///
367 /// Default is 1000. Requests beyond this limit will queue until a slot
368 /// becomes available. Set to `None` for unbounded concurrency.
369 ///
370 /// This controls memory usage under high load while still allowing
371 /// parallel request processing.
372 pub fn max_concurrent_requests(mut self, limit: Option<usize>) -> Self {
373 self.max_concurrent_requests = limit;
374 self
375 }
376
377 /// Set the UDP socket receive buffer size.
378 ///
379 /// Default is 4MB. The kernel may cap this at `net.core.rmem_max`.
380 /// A larger buffer prevents packet loss during request bursts.
381 ///
382 /// Set to `None` to use the kernel default.
383 pub fn recv_buffer_size(mut self, size: Option<usize>) -> Self {
384 self.recv_buffer_size = size;
385 self
386 }
387
388 /// Register a MIB handler for an OID subtree.
389 ///
390 /// Handlers are matched by longest prefix. When a request comes in,
391 /// the handler with the longest matching prefix is used.
392 ///
393 /// # Example
394 ///
395 /// ```rust,no_run
396 /// use async_snmp::agent::Agent;
397 /// use async_snmp::handler::{MibHandler, RequestContext, GetResult, GetNextResult, BoxFuture};
398 /// use async_snmp::{Oid, Value, VarBind, oid};
399 /// use std::sync::Arc;
400 ///
401 /// struct SystemHandler;
402 /// impl MibHandler for SystemHandler {
403 /// fn get<'a>(&'a self, _: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
404 /// Box::pin(async move {
405 /// if oid == &oid!(1, 3, 6, 1, 2, 1, 1, 1, 0) {
406 /// GetResult::Value(Value::OctetString("My Agent".into()))
407 /// } else {
408 /// GetResult::NoSuchObject
409 /// }
410 /// })
411 /// }
412 /// fn get_next<'a>(&'a self, _: &'a RequestContext, _: &'a Oid) -> BoxFuture<'a, GetNextResult> {
413 /// Box::pin(async { GetNextResult::EndOfMibView })
414 /// }
415 /// }
416 ///
417 /// # async fn example() -> Result<(), async_snmp::Error> {
418 /// let agent = Agent::builder()
419 /// .bind("0.0.0.0:1161")
420 /// .community(b"public")
421 /// // Register handler for system MIB subtree
422 /// .handler(oid!(1, 3, 6, 1, 2, 1, 1), Arc::new(SystemHandler))
423 /// .build()
424 /// .await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub fn handler(mut self, prefix: Oid, handler: Arc<dyn MibHandler>) -> Self {
429 self.handlers.push(RegisteredHandler { prefix, handler });
430 self
431 }
432
433 /// Configure VACM (View-based Access Control Model) using a builder function.
434 ///
435 /// When VACM is enabled, all requests are checked against the configured
436 /// access control rules. Requests that don't have proper access are rejected
437 /// with `noAccess` error (v2c/v3) or `noSuchName` (v1).
438 ///
439 /// # Example
440 ///
441 /// ```rust,no_run
442 /// use async_snmp::agent::{Agent, SecurityModel, VacmBuilder};
443 /// use async_snmp::message::SecurityLevel;
444 /// use async_snmp::oid;
445 ///
446 /// # async fn example() -> Result<(), async_snmp::Error> {
447 /// let agent = Agent::builder()
448 /// .bind("0.0.0.0:161")
449 /// .community(b"public")
450 /// .community(b"private")
451 /// .vacm(|v| v
452 /// .group("public", SecurityModel::V2c, "readonly_group")
453 /// .group("private", SecurityModel::V2c, "readwrite_group")
454 /// .access("readonly_group", |a| a
455 /// .read_view("full_view"))
456 /// .access("readwrite_group", |a| a
457 /// .read_view("full_view")
458 /// .write_view("write_view"))
459 /// .view("full_view", |v| v
460 /// .include(oid!(1, 3, 6, 1)))
461 /// .view("write_view", |v| v
462 /// .include(oid!(1, 3, 6, 1, 2, 1, 1))))
463 /// .build()
464 /// .await?;
465 /// # Ok(())
466 /// # }
467 /// ```
468 pub fn vacm<F>(mut self, configure: F) -> Self
469 where
470 F: FnOnce(VacmBuilder) -> VacmBuilder,
471 {
472 let builder = VacmBuilder::new();
473 self.vacm = Some(configure(builder).build());
474 self
475 }
476
477 /// Set a cancellation token for graceful shutdown.
478 ///
479 /// If not set, the agent creates its own token accessible via `Agent::cancel()`.
480 pub fn cancel(mut self, token: CancellationToken) -> Self {
481 self.cancel = Some(token);
482 self
483 }
484
485 /// Build the agent.
486 pub async fn build(mut self) -> Result<Agent> {
487 let bind_addr: std::net::SocketAddr = self.bind_addr.parse().map_err(|_| Error::Io {
488 target: None,
489 source: std::io::Error::new(
490 std::io::ErrorKind::InvalidInput,
491 format!("invalid bind address: {}", self.bind_addr),
492 ),
493 })?;
494
495 let socket = bind_udp_socket(bind_addr, self.recv_buffer_size)
496 .await
497 .map_err(|e| Error::Io {
498 target: Some(bind_addr),
499 source: e,
500 })?;
501
502 let local_addr = socket.local_addr().map_err(|e| Error::Io {
503 target: Some(bind_addr),
504 source: e,
505 })?;
506
507 let socket_state =
508 UdpSocketState::new(UdpSockRef::from(&socket)).map_err(|e| Error::Io {
509 target: Some(bind_addr),
510 source: e,
511 })?;
512
513 // Generate default engine ID if not provided
514 let engine_id = self.engine_id.unwrap_or_else(|| {
515 // RFC 3411 format: enterprise number + format + local identifier
516 // Use a simple format: 0x80 (local) + timestamp + random
517 let mut id = vec![0x80, 0x00, 0x00, 0x00, 0x01]; // Enterprise format indicator
518 let timestamp = std::time::SystemTime::now()
519 .duration_since(std::time::UNIX_EPOCH)
520 .unwrap_or_default()
521 .as_secs();
522 id.extend_from_slice(×tamp.to_be_bytes());
523 id
524 });
525
526 // Sort handlers by prefix length (longest first) for matching
527 self.handlers
528 .sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
529
530 let cancel = self.cancel.unwrap_or_default();
531
532 // Create concurrency limiter if configured
533 let concurrency_limit = self
534 .max_concurrent_requests
535 .map(|n| Arc::new(Semaphore::new(n)));
536
537 Ok(Agent {
538 inner: Arc::new(AgentInner {
539 socket: Arc::new(socket),
540 socket_state,
541 local_addr,
542 communities: self.communities,
543 usm_users: self.usm_users,
544 handlers: self.handlers,
545 engine_id,
546 engine_boots: AtomicU32::new(1),
547 engine_time: AtomicU32::new(0),
548 engine_start: Instant::now(),
549 salt_counter: SaltCounter::new(),
550 max_message_size: self.max_message_size,
551 concurrency_limit,
552 vacm: self.vacm,
553 snmp_invalid_msgs: AtomicU32::new(0),
554 snmp_unknown_security_models: AtomicU32::new(0),
555 snmp_silent_drops: AtomicU32::new(0),
556 cancel,
557 }),
558 })
559 }
560}
561
562impl Default for AgentBuilder {
563 fn default() -> Self {
564 Self::new()
565 }
566}
567
568/// Inner state shared across agent clones.
569pub(crate) struct AgentInner {
570 pub(crate) socket: Arc<UdpSocket>,
571 pub(crate) socket_state: UdpSocketState,
572 pub(crate) local_addr: SocketAddr,
573 pub(crate) communities: Vec<Vec<u8>>,
574 pub(crate) usm_users: HashMap<Bytes, UsmUserConfig>,
575 pub(crate) handlers: Vec<RegisteredHandler>,
576 pub(crate) engine_id: Vec<u8>,
577 pub(crate) engine_boots: AtomicU32,
578 pub(crate) engine_time: AtomicU32,
579 pub(crate) engine_start: Instant,
580 pub(crate) salt_counter: SaltCounter,
581 pub(crate) max_message_size: usize,
582 pub(crate) concurrency_limit: Option<Arc<Semaphore>>,
583 pub(crate) vacm: Option<VacmConfig>,
584 // RFC 3412 statistics counters
585 /// snmpInvalidMsgs (1.3.6.1.6.3.11.2.1.2) - messages with invalid msgFlags
586 /// (e.g., privacy without authentication)
587 pub(crate) snmp_invalid_msgs: AtomicU32,
588 /// snmpUnknownSecurityModels (1.3.6.1.6.3.11.2.1.1) - messages with
589 /// unrecognized security model
590 pub(crate) snmp_unknown_security_models: AtomicU32,
591 /// snmpSilentDrops (1.3.6.1.6.3.11.2.1.3) - confirmed-class PDUs silently
592 /// dropped because even an empty response would exceed max message size
593 pub(crate) snmp_silent_drops: AtomicU32,
594 /// Cancellation token for graceful shutdown.
595 pub(crate) cancel: CancellationToken,
596}
597
598/// SNMP Agent.
599///
600/// Listens for and responds to SNMP requests (GET, GETNEXT, GETBULK, SET).
601///
602/// # Example
603///
604/// ```rust,no_run
605/// use async_snmp::agent::Agent;
606/// use async_snmp::oid;
607///
608/// # async fn example() -> Result<(), async_snmp::Error> {
609/// let agent = Agent::builder()
610/// .bind("0.0.0.0:161")
611/// .community(b"public")
612/// .build()
613/// .await?;
614///
615/// agent.run().await
616/// # }
617/// ```
618pub struct Agent {
619 pub(crate) inner: Arc<AgentInner>,
620}
621
622impl Agent {
623 /// Create a builder for configuring the agent.
624 pub fn builder() -> AgentBuilder {
625 AgentBuilder::new()
626 }
627
628 /// Get the local address the agent is bound to.
629 pub fn local_addr(&self) -> SocketAddr {
630 self.inner.local_addr
631 }
632
633 /// Get the engine ID.
634 pub fn engine_id(&self) -> &[u8] {
635 &self.inner.engine_id
636 }
637
638 /// Get the cancellation token for this agent.
639 ///
640 /// Call `token.cancel()` to initiate graceful shutdown.
641 pub fn cancel(&self) -> CancellationToken {
642 self.inner.cancel.clone()
643 }
644
645 /// Get the snmpInvalidMsgs counter value.
646 ///
647 /// This counter tracks messages with invalid msgFlags, such as
648 /// privacy-without-authentication (RFC 3412 Section 7.2 Step 5d).
649 ///
650 /// OID: 1.3.6.1.6.3.11.2.1.2
651 pub fn snmp_invalid_msgs(&self) -> u32 {
652 self.inner.snmp_invalid_msgs.load(Ordering::Relaxed)
653 }
654
655 /// Get the snmpUnknownSecurityModels counter value.
656 ///
657 /// This counter tracks messages with unrecognized security models
658 /// (RFC 3412 Section 7.2 Step 2).
659 ///
660 /// OID: 1.3.6.1.6.3.11.2.1.1
661 pub fn snmp_unknown_security_models(&self) -> u32 {
662 self.inner
663 .snmp_unknown_security_models
664 .load(Ordering::Relaxed)
665 }
666
667 /// Get the snmpSilentDrops counter value.
668 ///
669 /// This counter tracks confirmed-class PDUs (GetRequest, GetNextRequest,
670 /// GetBulkRequest, SetRequest, InformRequest) that were silently dropped
671 /// because even an empty Response-PDU would exceed the maximum message
672 /// size constraint (RFC 3412 Section 7.1).
673 ///
674 /// OID: 1.3.6.1.6.3.11.2.1.3
675 pub fn snmp_silent_drops(&self) -> u32 {
676 self.inner.snmp_silent_drops.load(Ordering::Relaxed)
677 }
678
679 /// Run the agent, processing requests concurrently.
680 ///
681 /// Requests are processed in parallel up to the configured
682 /// `max_concurrent_requests` limit (default: 1000). This method runs
683 /// until the cancellation token is triggered.
684 #[instrument(skip(self), err, fields(snmp.local_addr = %self.local_addr()))]
685 pub async fn run(&self) -> Result<()> {
686 let mut buf = vec![0u8; 65535];
687
688 loop {
689 let recv_meta = tokio::select! {
690 result = self.recv_packet(&mut buf) => {
691 result?
692 }
693 _ = self.inner.cancel.cancelled() => {
694 tracing::info!("agent shutdown requested");
695 return Ok(());
696 }
697 };
698
699 let data = Bytes::copy_from_slice(&buf[..recv_meta.len]);
700 let agent = self.clone();
701
702 let permit = if let Some(ref sem) = self.inner.concurrency_limit {
703 Some(sem.clone().acquire_owned().await.expect("semaphore closed"))
704 } else {
705 None
706 };
707
708 tokio::spawn(async move {
709 agent.update_engine_time();
710
711 match agent.handle_request(data, recv_meta.addr).await {
712 Ok(Some(response_bytes)) => {
713 if let Err(e) = agent.send_response(&response_bytes, &recv_meta).await {
714 tracing::warn!(snmp.source = %recv_meta.addr, error = %e, "failed to send response");
715 }
716 }
717 Ok(None) => {}
718 Err(e) => {
719 tracing::warn!(snmp.source = %recv_meta.addr, error = %e, "error handling request");
720 }
721 }
722
723 drop(permit);
724 });
725 }
726 }
727
728 async fn recv_packet(&self, buf: &mut [u8]) -> Result<RecvMeta> {
729 let mut iov = [IoSliceMut::new(buf)];
730 let mut meta = [RecvMeta::default()];
731
732 loop {
733 self.inner.socket.readable().await.map_err(|e| Error::Io {
734 target: Some(self.inner.local_addr),
735 source: e,
736 })?;
737
738 let result = self.inner.socket.try_io(tokio::io::Interest::READABLE, || {
739 let sref = UdpSockRef::from(&*self.inner.socket);
740 self.inner.socket_state.recv(sref, &mut iov, &mut meta)
741 });
742
743 match result {
744 Ok(n) if n > 0 => return Ok(meta[0]),
745 Ok(_) => continue,
746 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
747 Err(e) => {
748 return Err(Error::Io {
749 target: Some(self.inner.local_addr),
750 source: e,
751 });
752 }
753 }
754 }
755 }
756
757 async fn send_response(&self, data: &[u8], recv_meta: &RecvMeta) -> std::io::Result<()> {
758 let transmit = Transmit {
759 destination: recv_meta.addr,
760 ecn: None,
761 contents: data,
762 segment_size: None,
763 src_ip: recv_meta.dst_ip,
764 };
765
766 loop {
767 self.inner.socket.writable().await?;
768
769 let result = self.inner.socket.try_io(tokio::io::Interest::WRITABLE, || {
770 let sref = UdpSockRef::from(&*self.inner.socket);
771 self.inner.socket_state.try_send(sref, &transmit)
772 });
773
774 match result {
775 Ok(()) => return Ok(()),
776 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
777 Err(e) => return Err(e),
778 }
779 }
780 }
781
782 /// Process a single request and return the response bytes.
783 ///
784 /// Returns `None` if no response should be sent.
785 async fn handle_request(&self, data: Bytes, source: SocketAddr) -> Result<Option<Bytes>> {
786 // Peek at version
787 let mut decoder = Decoder::new(data.clone());
788 let mut seq = decoder.read_sequence()?;
789 let version_num = seq.read_integer()?;
790 let version = Version::from_i32(version_num).ok_or_else(|| {
791 Error::decode(seq.offset(), DecodeErrorKind::UnknownVersion(version_num))
792 })?;
793 drop(seq);
794 drop(decoder);
795
796 match version {
797 Version::V1 => self.handle_v1(data, source).await,
798 Version::V2c => self.handle_v2c(data, source).await,
799 Version::V3 => self.handle_v3(data, source).await,
800 }
801 }
802
803 /// Update engine time based on elapsed time since start.
804 fn update_engine_time(&self) {
805 let elapsed = self.inner.engine_start.elapsed().as_secs() as u32;
806 self.inner.engine_time.store(elapsed, Ordering::Relaxed);
807 }
808
809 /// Validate community string using constant-time comparison.
810 ///
811 /// Uses constant-time comparison to prevent timing attacks that could
812 /// be used to guess valid community strings character by character.
813 pub(crate) fn validate_community(&self, community: &[u8]) -> bool {
814 if self.inner.communities.is_empty() {
815 // No communities configured = reject all
816 return false;
817 }
818 // Use constant-time comparison for each community string.
819 // We compare against all configured communities regardless of
820 // early matches to maintain constant-time behavior.
821 let mut valid = false;
822 for configured in &self.inner.communities {
823 // ct_eq returns a Choice, which we convert to bool after comparison
824 if configured.len() == community.len()
825 && bool::from(configured.as_slice().ct_eq(community))
826 {
827 valid = true;
828 }
829 }
830 valid
831 }
832
833 /// Dispatch a request to the appropriate handler.
834 async fn dispatch_request(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
835 match pdu.pdu_type {
836 PduType::GetRequest => self.handle_get(ctx, pdu).await,
837 PduType::GetNextRequest => self.handle_get_next(ctx, pdu).await,
838 PduType::GetBulkRequest => self.handle_get_bulk(ctx, pdu).await,
839 PduType::SetRequest => self.handle_set(ctx, pdu).await,
840 PduType::InformRequest => self.handle_inform(pdu),
841 _ => {
842 // Should not happen - filtered earlier
843 Ok(pdu.to_error_response(ErrorStatus::GenErr, 0))
844 }
845 }
846 }
847
848 /// Handle InformRequest PDU.
849 ///
850 /// Per RFC 3416 Section 4.2.7, an InformRequest is a confirmed-class PDU
851 /// that the receiver acknowledges by returning a Response with the same
852 /// request-id and varbind list.
853 fn handle_inform(&self, pdu: &Pdu) -> Result<Pdu> {
854 // Simply acknowledge by returning the same varbinds in a Response
855 Ok(Pdu {
856 pdu_type: PduType::Response,
857 request_id: pdu.request_id,
858 error_status: 0,
859 error_index: 0,
860 varbinds: pdu.varbinds.clone(),
861 })
862 }
863
864 /// Handle GET request.
865 async fn handle_get(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
866 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
867
868 for (index, vb) in pdu.varbinds.iter().enumerate() {
869 // VACM read access check
870 if let Some(ref vacm) = self.inner.vacm
871 && !vacm.check_access(ctx.read_view.as_ref(), &vb.oid)
872 {
873 // v1: noSuchName, v2c/v3: noAccess or NoSuchObject
874 if ctx.version == Version::V1 {
875 return Ok(Pdu {
876 pdu_type: PduType::Response,
877 request_id: pdu.request_id,
878 error_status: ErrorStatus::NoSuchName.as_i32(),
879 error_index: (index + 1) as i32,
880 varbinds: pdu.varbinds.clone(),
881 });
882 } else {
883 // For GET, return NoSuchObject for inaccessible OIDs per RFC 3415
884 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::NoSuchObject));
885 continue;
886 }
887 }
888
889 let result = if let Some(handler) = self.find_handler(&vb.oid) {
890 handler.handler.get(ctx, &vb.oid).await
891 } else {
892 GetResult::NoSuchObject
893 };
894
895 let response_value = match result {
896 GetResult::Value(v) => v,
897 GetResult::NoSuchObject => {
898 // v1 returns noSuchName error, v2c/v3 returns NoSuchObject exception
899 if ctx.version == Version::V1 {
900 return Ok(Pdu {
901 pdu_type: PduType::Response,
902 request_id: pdu.request_id,
903 error_status: ErrorStatus::NoSuchName.as_i32(),
904 error_index: (index + 1) as i32,
905 varbinds: pdu.varbinds.clone(),
906 });
907 } else {
908 Value::NoSuchObject
909 }
910 }
911 GetResult::NoSuchInstance => {
912 // v1 returns noSuchName error, v2c/v3 returns NoSuchInstance exception
913 if ctx.version == Version::V1 {
914 return Ok(Pdu {
915 pdu_type: PduType::Response,
916 request_id: pdu.request_id,
917 error_status: ErrorStatus::NoSuchName.as_i32(),
918 error_index: (index + 1) as i32,
919 varbinds: pdu.varbinds.clone(),
920 });
921 } else {
922 Value::NoSuchInstance
923 }
924 }
925 };
926
927 response_varbinds.push(VarBind::new(vb.oid.clone(), response_value));
928 }
929
930 Ok(Pdu {
931 pdu_type: PduType::Response,
932 request_id: pdu.request_id,
933 error_status: 0,
934 error_index: 0,
935 varbinds: response_varbinds,
936 })
937 }
938
939 /// Handle GETNEXT request.
940 async fn handle_get_next(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
941 let mut response_varbinds = Vec::with_capacity(pdu.varbinds.len());
942
943 for (index, vb) in pdu.varbinds.iter().enumerate() {
944 // Try to find the next OID from any handler
945 let next = self.get_next_oid(ctx, &vb.oid).await;
946
947 // Check VACM access for the returned OID (if VACM enabled)
948 let next = if let Some(ref next_vb) = next {
949 if let Some(ref vacm) = self.inner.vacm {
950 if vacm.check_access(ctx.read_view.as_ref(), &next_vb.oid) {
951 next
952 } else {
953 // OID not accessible, continue searching
954 // For simplicity, we just return EndOfMibView here
955 // A more complete implementation would continue the search
956 None
957 }
958 } else {
959 next
960 }
961 } else {
962 next
963 };
964
965 match next {
966 Some(next_vb) => {
967 response_varbinds.push(next_vb);
968 }
969 None => {
970 // v1 returns noSuchName, v2c/v3 returns endOfMibView
971 if ctx.version == Version::V1 {
972 return Ok(Pdu {
973 pdu_type: PduType::Response,
974 request_id: pdu.request_id,
975 error_status: ErrorStatus::NoSuchName.as_i32(),
976 error_index: (index + 1) as i32,
977 varbinds: pdu.varbinds.clone(),
978 });
979 } else {
980 response_varbinds.push(VarBind::new(vb.oid.clone(), Value::EndOfMibView));
981 }
982 }
983 }
984 }
985
986 Ok(Pdu {
987 pdu_type: PduType::Response,
988 request_id: pdu.request_id,
989 error_status: 0,
990 error_index: 0,
991 varbinds: response_varbinds,
992 })
993 }
994
995 /// Handle GETBULK request.
996 ///
997 /// Per RFC 3416 Section 4.2.3, if the response would exceed the message
998 /// size limit, we return fewer variable bindings rather than all of them.
999 async fn handle_get_bulk(&self, ctx: &RequestContext, pdu: &Pdu) -> Result<Pdu> {
1000 // For GETBULK, error_status is non_repeaters and error_index is max_repetitions
1001 let non_repeaters = pdu.error_status.max(0) as usize;
1002 let max_repetitions = pdu.error_index.max(0) as usize;
1003
1004 let mut response_varbinds = Vec::new();
1005 let mut current_size: usize = RESPONSE_OVERHEAD;
1006 let max_size = self.inner.max_message_size;
1007
1008 // Helper to check if we can add a varbind
1009 let can_add = |vb: &VarBind, current_size: usize| -> bool {
1010 current_size + vb.encoded_size() <= max_size
1011 };
1012
1013 // Handle non-repeaters (first N varbinds get one GETNEXT each)
1014 for vb in pdu.varbinds.iter().take(non_repeaters) {
1015 let next_vb = match self.get_next_oid(ctx, &vb.oid).await {
1016 Some(next_vb) => next_vb,
1017 None => VarBind::new(vb.oid.clone(), Value::EndOfMibView),
1018 };
1019
1020 if !can_add(&next_vb, current_size) {
1021 // Can't fit even non-repeaters, return tooBig if we have nothing
1022 if response_varbinds.is_empty() {
1023 return Ok(Pdu {
1024 pdu_type: PduType::Response,
1025 request_id: pdu.request_id,
1026 error_status: ErrorStatus::TooBig.as_i32(),
1027 error_index: 0,
1028 varbinds: pdu.varbinds.clone(),
1029 });
1030 }
1031 // Otherwise return what we have
1032 break;
1033 }
1034
1035 current_size += next_vb.encoded_size();
1036 response_varbinds.push(next_vb);
1037 }
1038
1039 // Handle repeaters
1040 if non_repeaters < pdu.varbinds.len() {
1041 let repeaters = &pdu.varbinds[non_repeaters..];
1042 let mut current_oids: Vec<Oid> = repeaters.iter().map(|vb| vb.oid.clone()).collect();
1043 let mut all_done = vec![false; repeaters.len()];
1044
1045 'outer: for _ in 0..max_repetitions {
1046 let mut row_complete = true;
1047 for (i, oid) in current_oids.iter_mut().enumerate() {
1048 let next_vb = if all_done[i] {
1049 VarBind::new(oid.clone(), Value::EndOfMibView)
1050 } else {
1051 match self.get_next_oid(ctx, oid).await {
1052 Some(next_vb) => {
1053 *oid = next_vb.oid.clone();
1054 row_complete = false;
1055 next_vb
1056 }
1057 None => {
1058 all_done[i] = true;
1059 VarBind::new(oid.clone(), Value::EndOfMibView)
1060 }
1061 }
1062 };
1063
1064 // Check size before adding
1065 if !can_add(&next_vb, current_size) {
1066 // Can't fit more, return what we have
1067 break 'outer;
1068 }
1069
1070 current_size += next_vb.encoded_size();
1071 response_varbinds.push(next_vb);
1072 }
1073
1074 if row_complete {
1075 break;
1076 }
1077 }
1078 }
1079
1080 Ok(Pdu {
1081 pdu_type: PduType::Response,
1082 request_id: pdu.request_id,
1083 error_status: 0,
1084 error_index: 0,
1085 varbinds: response_varbinds,
1086 })
1087 }
1088
1089 /// Find the handler for a given OID.
1090 pub(crate) fn find_handler(&self, oid: &Oid) -> Option<&RegisteredHandler> {
1091 // Handlers are sorted by prefix length (longest first)
1092 self.inner
1093 .handlers
1094 .iter()
1095 .find(|&handler| handler.handler.handles(&handler.prefix, oid))
1096 .map(|v| v as _)
1097 }
1098
1099 /// Get the next OID from any handler.
1100 async fn get_next_oid(&self, ctx: &RequestContext, oid: &Oid) -> Option<VarBind> {
1101 // Find the first handler that can provide a next OID
1102 let mut best_result: Option<VarBind> = None;
1103
1104 for handler in &self.inner.handlers {
1105 if let GetNextResult::Value(next) = handler.handler.get_next(ctx, oid).await {
1106 // Must be lexicographically greater than the request OID
1107 if next.oid > *oid {
1108 match &best_result {
1109 None => best_result = Some(next),
1110 Some(current) if next.oid < current.oid => best_result = Some(next),
1111 _ => {}
1112 }
1113 }
1114 }
1115 }
1116
1117 best_result
1118 }
1119}
1120
1121impl Clone for Agent {
1122 fn clone(&self) -> Self {
1123 Self {
1124 inner: Arc::clone(&self.inner),
1125 }
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use crate::handler::{
1133 BoxFuture, GetNextResult, GetResult, MibHandler, RequestContext, SecurityModel, SetResult,
1134 };
1135 use crate::message::SecurityLevel;
1136 use crate::oid;
1137
1138 struct TestHandler;
1139
1140 impl MibHandler for TestHandler {
1141 fn get<'a>(&'a self, _ctx: &'a RequestContext, oid: &'a Oid) -> BoxFuture<'a, GetResult> {
1142 Box::pin(async move {
1143 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0) {
1144 return GetResult::Value(Value::Integer(42));
1145 }
1146 if oid == &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0) {
1147 return GetResult::Value(Value::OctetString(Bytes::from_static(b"test")));
1148 }
1149 GetResult::NoSuchObject
1150 })
1151 }
1152
1153 fn get_next<'a>(
1154 &'a self,
1155 _ctx: &'a RequestContext,
1156 oid: &'a Oid,
1157 ) -> BoxFuture<'a, GetNextResult> {
1158 Box::pin(async move {
1159 let oid1 = oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0);
1160 let oid2 = oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0);
1161
1162 if oid < &oid1 {
1163 return GetNextResult::Value(VarBind::new(oid1, Value::Integer(42)));
1164 }
1165 if oid < &oid2 {
1166 return GetNextResult::Value(VarBind::new(
1167 oid2,
1168 Value::OctetString(Bytes::from_static(b"test")),
1169 ));
1170 }
1171 GetNextResult::EndOfMibView
1172 })
1173 }
1174 }
1175
1176 fn test_ctx() -> RequestContext {
1177 RequestContext {
1178 source: "127.0.0.1:12345".parse().unwrap(),
1179 version: Version::V2c,
1180 security_model: SecurityModel::V2c,
1181 security_name: Bytes::from_static(b"public"),
1182 security_level: SecurityLevel::NoAuthNoPriv,
1183 context_name: Bytes::new(),
1184 request_id: 1,
1185 pdu_type: PduType::GetRequest,
1186 group_name: None,
1187 read_view: None,
1188 write_view: None,
1189 }
1190 }
1191
1192 #[test]
1193 fn test_agent_builder_defaults() {
1194 let builder = AgentBuilder::new();
1195 assert_eq!(builder.bind_addr, "0.0.0.0:161");
1196 assert!(builder.communities.is_empty());
1197 assert!(builder.usm_users.is_empty());
1198 assert!(builder.handlers.is_empty());
1199 }
1200
1201 #[test]
1202 fn test_agent_builder_community() {
1203 let builder = AgentBuilder::new()
1204 .community(b"public")
1205 .community(b"private");
1206 assert_eq!(builder.communities.len(), 2);
1207 }
1208
1209 #[test]
1210 fn test_agent_builder_communities() {
1211 let builder = AgentBuilder::new().communities(["public", "private"]);
1212 assert_eq!(builder.communities.len(), 2);
1213 }
1214
1215 #[test]
1216 fn test_agent_builder_handler() {
1217 let builder =
1218 AgentBuilder::new().handler(oid!(1, 3, 6, 1, 4, 1, 99999), Arc::new(TestHandler));
1219 assert_eq!(builder.handlers.len(), 1);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_mib_handler_default_set() {
1224 let handler = TestHandler;
1225 let mut ctx = test_ctx();
1226 ctx.pdu_type = PduType::SetRequest;
1227
1228 let result = handler
1229 .test_set(&ctx, &oid!(1, 3, 6, 1), &Value::Integer(1))
1230 .await;
1231 assert_eq!(result, SetResult::NotWritable);
1232 }
1233
1234 #[test]
1235 fn test_mib_handler_handles() {
1236 let handler = TestHandler;
1237 let prefix = oid!(1, 3, 6, 1, 4, 1, 99999);
1238
1239 // OID within prefix
1240 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0)));
1241
1242 // OID before prefix (GETNEXT should still try)
1243 assert!(handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 99998)));
1244
1245 // OID after prefix (not handled)
1246 assert!(!handler.handles(&prefix, &oid!(1, 3, 6, 1, 4, 1, 100000)));
1247 }
1248
1249 #[tokio::test]
1250 async fn test_test_handler_get() {
1251 let handler = TestHandler;
1252 let ctx = test_ctx();
1253
1254 // Existing OID
1255 let result = handler
1256 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1257 .await;
1258 assert!(matches!(result, GetResult::Value(Value::Integer(42))));
1259
1260 // Non-existing OID
1261 let result = handler
1262 .get(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 99, 0))
1263 .await;
1264 assert!(matches!(result, GetResult::NoSuchObject));
1265 }
1266
1267 #[tokio::test]
1268 async fn test_test_handler_get_next() {
1269 let handler = TestHandler;
1270 let mut ctx = test_ctx();
1271 ctx.pdu_type = PduType::GetNextRequest;
1272
1273 // Before first OID
1274 let next = handler.get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999)).await;
1275 assert!(next.is_value());
1276 if let GetNextResult::Value(vb) = next {
1277 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0));
1278 }
1279
1280 // Between OIDs
1281 let next = handler
1282 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 1, 0))
1283 .await;
1284 assert!(next.is_value());
1285 if let GetNextResult::Value(vb) = next {
1286 assert_eq!(vb.oid, oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0));
1287 }
1288
1289 // After last OID
1290 let next = handler
1291 .get_next(&ctx, &oid!(1, 3, 6, 1, 4, 1, 99999, 2, 0))
1292 .await;
1293 assert!(next.is_end_of_mib_view());
1294 }
1295}