1mod auth;
4mod builder;
5mod retry;
6mod v3;
7mod walk;
8
9pub use auth::{Auth, CommunityVersion, UsmAuth, UsmBuilder};
10pub use builder::{ClientBuilder, Target};
11pub use retry::{Backoff, Retry, RetryBuilder};
12
13impl Client<UdpHandle> {
15 pub fn builder(target: impl Into<Target>, auth: impl Into<Auth>) -> ClientBuilder {
43 ClientBuilder::new(target, auth)
44 }
45}
46use crate::error::internal::DecodeErrorKind;
47use crate::error::{Error, Result};
48use crate::message::{CommunityMessage, Message};
49use crate::oid::Oid;
50use crate::pdu::{GetBulkPdu, Pdu};
51use crate::transport::Transport;
52use crate::transport::UdpHandle;
53use crate::v3::{EngineCache, EngineState, SaltCounter};
54use crate::value::Value;
55use crate::varbind::VarBind;
56use crate::version::Version;
57use bytes::Bytes;
58use std::net::SocketAddr;
59use std::sync::Arc;
60use std::sync::RwLock;
61use std::time::{Duration, Instant};
62use tracing::{Span, instrument};
63
64pub use crate::notification::{DerivedKeys, UsmConfig};
65pub use walk::{BulkWalk, OidOrdering, Walk, WalkMode, WalkStream};
66
67pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
73
74pub const DEFAULT_MAX_OIDS_PER_REQUEST: usize = 10;
79
80pub const DEFAULT_MAX_REPETITIONS: u32 = 25;
84
85#[derive(Clone)]
89pub struct Client<T: Transport = UdpHandle> {
90 inner: Arc<ClientInner<T>>,
91}
92
93struct ClientInner<T: Transport> {
94 transport: T,
95 config: ClientConfig,
96 engine_state: RwLock<Option<EngineState>>,
98 derived_keys: RwLock<Option<DerivedKeys>>,
100 salt_counter: SaltCounter,
102 engine_cache: Option<Arc<EngineCache>>,
104}
105
106#[derive(Clone)]
110pub struct ClientConfig {
111 pub version: Version,
113 pub community: Bytes,
115 pub timeout: Duration,
117 pub retry: Retry,
119 pub max_oids_per_request: usize,
121 pub v3_security: Option<UsmConfig>,
123 pub walk_mode: WalkMode,
125 pub oid_ordering: OidOrdering,
127 pub max_walk_results: Option<usize>,
129 pub max_repetitions: u32,
131}
132
133impl Default for ClientConfig {
134 fn default() -> Self {
138 Self {
139 version: Version::V2c,
140 community: Bytes::from_static(b"public"),
141 timeout: DEFAULT_TIMEOUT,
142 retry: Retry::default(),
143 max_oids_per_request: DEFAULT_MAX_OIDS_PER_REQUEST,
144 v3_security: None,
145 walk_mode: WalkMode::Auto,
146 oid_ordering: OidOrdering::Strict,
147 max_walk_results: None,
148 max_repetitions: DEFAULT_MAX_REPETITIONS,
149 }
150 }
151}
152
153impl<T: Transport> Client<T> {
154 pub fn new(transport: T, config: ClientConfig) -> Self {
161 Self {
162 inner: Arc::new(ClientInner {
163 transport,
164 config,
165 engine_state: RwLock::new(None),
166 derived_keys: RwLock::new(None),
167 salt_counter: SaltCounter::new(),
168 engine_cache: None,
169 }),
170 }
171 }
172
173 pub fn with_engine_cache(
175 transport: T,
176 config: ClientConfig,
177 engine_cache: Arc<EngineCache>,
178 ) -> Self {
179 Self {
180 inner: Arc::new(ClientInner {
181 transport,
182 config,
183 engine_state: RwLock::new(None),
184 derived_keys: RwLock::new(None),
185 salt_counter: SaltCounter::new(),
186 engine_cache: Some(engine_cache),
187 }),
188 }
189 }
190
191 pub fn peer_addr(&self) -> SocketAddr {
196 self.inner.transport.peer_addr()
197 }
198
199 fn next_request_id(&self) -> i32 {
203 self.inner.transport.alloc_request_id()
204 }
205
206 fn is_v3(&self) -> bool {
208 self.inner.config.version == Version::V3 && self.inner.config.v3_security.is_some()
209 }
210
211 #[instrument(
213 level = "debug",
214 skip(self, data),
215 fields(
216 snmp.target = %self.peer_addr(),
217 snmp.request_id = request_id,
218 snmp.attempt = tracing::field::Empty,
219 snmp.elapsed_ms = tracing::field::Empty,
220 )
221 )]
222 async fn send_and_recv(&self, request_id: i32, data: &[u8]) -> Result<Pdu> {
223 let start = Instant::now();
224 let mut last_error: Option<Box<Error>> = None;
225 let max_attempts = if self.inner.transport.is_reliable() {
226 0
227 } else {
228 self.inner.config.retry.max_attempts
229 };
230
231 for attempt in 0..=max_attempts {
232 Span::current().record("snmp.attempt", attempt);
233 if attempt > 0 {
234 tracing::debug!(target: "async_snmp::client", "retrying request");
235 }
236
237 self.inner
239 .transport
240 .register_request(request_id, self.inner.config.timeout);
241
242 tracing::trace!(target: "async_snmp::client", { snmp.bytes = data.len() }, "sending request");
244 self.inner.transport.send(data).await?;
245
246 match self.inner.transport.recv(request_id).await {
248 Ok((response_data, _source)) => {
249 tracing::trace!(target: "async_snmp::client", { snmp.bytes = response_data.len() }, "received response");
250
251 let response = Message::decode(response_data)?;
253
254 let response_version = response.version();
256 let expected_version = self.inner.config.version;
257 if response_version != expected_version {
258 tracing::warn!(target: "async_snmp::client", { ?expected_version, ?response_version, peer = %self.peer_addr() }, "version mismatch in response");
259 return Err(Error::MalformedResponse {
260 target: self.peer_addr(),
261 }
262 .boxed());
263 }
264
265 let response_pdu = response.into_pdu();
266
267 if response_pdu.request_id != request_id {
269 tracing::warn!(target: "async_snmp::client", { expected_request_id = request_id, actual_request_id = response_pdu.request_id, peer = %self.peer_addr() }, "request ID mismatch in response");
270 return Err(Error::MalformedResponse {
271 target: self.peer_addr(),
272 }
273 .boxed());
274 }
275
276 if response_pdu.is_error() {
278 let status = response_pdu.error_status_enum();
279 let oid = (response_pdu.error_index as usize)
281 .checked_sub(1)
282 .and_then(|idx| response_pdu.varbinds.get(idx))
283 .map(|vb| vb.oid.clone());
284
285 Span::current()
286 .record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
287 return Err(Error::Snmp {
288 target: self.peer_addr(),
289 status,
290 index: response_pdu.error_index.max(0) as u32,
291 oid,
292 }
293 .boxed());
294 }
295
296 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
297 return Ok(response_pdu);
298 }
299 Err(e) if matches!(*e, Error::Timeout { .. }) => {
300 last_error = Some(e);
301 if attempt < max_attempts {
303 let delay = self.inner.config.retry.compute_delay(attempt);
304 if !delay.is_zero() {
305 tracing::debug!(target: "async_snmp::client", { delay_ms = delay.as_millis() as u64 }, "backing off");
306 tokio::time::sleep(delay).await;
307 }
308 }
309 continue;
310 }
311 Err(e) => {
312 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
313 return Err(e);
314 }
315 }
316 }
317
318 let elapsed = start.elapsed();
320 Span::current().record("snmp.elapsed_ms", elapsed.as_millis() as u64);
321 tracing::debug!(target: "async_snmp::client", { request_id, peer = %self.peer_addr(), ?elapsed, retries = max_attempts }, "request timed out");
322 Err(last_error.unwrap_or_else(|| {
323 Error::Timeout {
324 target: self.peer_addr(),
325 elapsed,
326 retries: max_attempts,
327 }
328 .boxed()
329 }))
330 }
331
332 async fn send_request(&self, pdu: Pdu) -> Result<Pdu> {
334 if self.is_v3() {
336 return self.send_v3_and_recv(pdu).await;
337 }
338
339 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?pdu.pdu_type, snmp.varbind_count = pdu.varbinds.len() }, "sending {} request", pdu.pdu_type);
340
341 let request_id = pdu.request_id;
342 let message = CommunityMessage::new(
343 self.inner.config.version,
344 self.inner.config.community.clone(),
345 pdu,
346 );
347 let data = message.encode();
348 let response = self.send_and_recv(request_id, &data).await?;
349
350 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
351
352 Ok(response)
353 }
354
355 async fn send_bulk_request(&self, pdu: GetBulkPdu) -> Result<Pdu> {
357 if self.is_v3() {
359 let pdu = Pdu::get_bulk(
361 pdu.request_id,
362 pdu.non_repeaters,
363 pdu.max_repetitions,
364 pdu.varbinds,
365 );
366 return self.send_v3_and_recv(pdu).await;
367 }
368
369 tracing::debug!(target: "async_snmp::client", { snmp.non_repeaters = pdu.non_repeaters, snmp.max_repetitions = pdu.max_repetitions, snmp.varbind_count = pdu.varbinds.len() }, "sending GetBulkRequest");
370
371 let request_id = pdu.request_id;
372 let data = CommunityMessage::encode_bulk(
373 self.inner.config.version,
374 self.inner.config.community.clone(),
375 &pdu,
376 );
377 let response = self.send_and_recv(request_id, &data).await?;
378
379 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
380
381 Ok(response)
382 }
383
384 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
386 pub async fn get(&self, oid: &Oid) -> Result<VarBind> {
387 let request_id = self.next_request_id();
388 let pdu = Pdu::get_request(request_id, std::slice::from_ref(oid));
389 let response = self.send_request(pdu).await?;
390
391 response.varbinds.into_iter().next().ok_or_else(|| {
392 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GET response");
393 Error::MalformedResponse {
394 target: self.peer_addr(),
395 }
396 .boxed()
397 })
398 }
399
400 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
421 pub async fn get_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
422 if oids.is_empty() {
423 return Ok(Vec::new());
424 }
425
426 let max_per_request = self.inner.config.max_oids_per_request;
427
428 if oids.len() <= max_per_request {
430 let request_id = self.next_request_id();
431 let pdu = Pdu::get_request(request_id, oids);
432 let response = self.send_request(pdu).await?;
433 return Ok(response.varbinds);
434 }
435
436 let num_batches = oids.len().div_ceil(max_per_request);
438 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GET request into batches");
439
440 let mut all_results = Vec::with_capacity(oids.len());
441
442 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
443 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GET batch");
444 let request_id = self.next_request_id();
445 let pdu = Pdu::get_request(request_id, chunk);
446 let response = self.send_request(pdu).await?;
447 all_results.extend(response.varbinds);
448 }
449
450 Ok(all_results)
451 }
452
453 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
455 pub async fn get_next(&self, oid: &Oid) -> Result<VarBind> {
456 let request_id = self.next_request_id();
457 let pdu = Pdu::get_next_request(request_id, std::slice::from_ref(oid));
458 let response = self.send_request(pdu).await?;
459
460 response.varbinds.into_iter().next().ok_or_else(|| {
461 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GETNEXT response");
462 Error::MalformedResponse {
463 target: self.peer_addr(),
464 }
465 .boxed()
466 })
467 }
468
469 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
489 pub async fn get_next_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
490 if oids.is_empty() {
491 return Ok(Vec::new());
492 }
493
494 let max_per_request = self.inner.config.max_oids_per_request;
495
496 if oids.len() <= max_per_request {
498 let request_id = self.next_request_id();
499 let pdu = Pdu::get_next_request(request_id, oids);
500 let response = self.send_request(pdu).await?;
501 return Ok(response.varbinds);
502 }
503
504 let num_batches = oids.len().div_ceil(max_per_request);
506 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GETNEXT request into batches");
507
508 let mut all_results = Vec::with_capacity(oids.len());
509
510 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
511 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GETNEXT batch");
512 let request_id = self.next_request_id();
513 let pdu = Pdu::get_next_request(request_id, chunk);
514 let response = self.send_request(pdu).await?;
515 all_results.extend(response.varbinds);
516 }
517
518 Ok(all_results)
519 }
520
521 #[instrument(skip(self, value), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
523 pub async fn set(&self, oid: &Oid, value: Value) -> Result<VarBind> {
524 let request_id = self.next_request_id();
525 let varbind = VarBind::new(oid.clone(), value);
526 let pdu = Pdu::set_request(request_id, vec![varbind]);
527 let response = self.send_request(pdu).await?;
528
529 response.varbinds.into_iter().next().ok_or_else(|| {
530 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty SET response");
531 Error::MalformedResponse {
532 target: self.peer_addr(),
533 }
534 .boxed()
535 })
536 }
537
538 #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = varbinds.len()))]
558 pub async fn set_many(&self, varbinds: &[(Oid, Value)]) -> Result<Vec<VarBind>> {
559 if varbinds.is_empty() {
560 return Ok(Vec::new());
561 }
562
563 let max_per_request = self.inner.config.max_oids_per_request;
564
565 if varbinds.len() <= max_per_request {
567 let request_id = self.next_request_id();
568 let vbs: Vec<VarBind> = varbinds
569 .iter()
570 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
571 .collect();
572 let pdu = Pdu::set_request(request_id, vbs);
573 let response = self.send_request(pdu).await?;
574 return Ok(response.varbinds);
575 }
576
577 let num_batches = varbinds.len().div_ceil(max_per_request);
579 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = varbinds.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting SET request into batches");
580
581 let mut all_results = Vec::with_capacity(varbinds.len());
582
583 for (batch_idx, chunk) in varbinds.chunks(max_per_request).enumerate() {
584 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending SET batch");
585 let request_id = self.next_request_id();
586 let vbs: Vec<VarBind> = chunk
587 .iter()
588 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
589 .collect();
590 let pdu = Pdu::set_request(request_id, vbs);
591 let response = self.send_request(pdu).await?;
592 all_results.extend(response.varbinds);
593 }
594
595 Ok(all_results)
596 }
597
598 #[instrument(skip(self, oids), err, fields(
631 snmp.target = %self.peer_addr(),
632 snmp.oid_count = oids.len(),
633 snmp.non_repeaters = non_repeaters,
634 snmp.max_repetitions = max_repetitions
635 ))]
636 pub async fn get_bulk(
637 &self,
638 oids: &[Oid],
639 non_repeaters: i32,
640 max_repetitions: i32,
641 ) -> Result<Vec<VarBind>> {
642 let request_id = self.next_request_id();
643 let pdu = GetBulkPdu::new(request_id, non_repeaters, max_repetitions, oids);
644 let response = self.send_bulk_request(pdu).await?;
645 Ok(response.varbinds)
646 }
647
648 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
674 pub fn walk(&self, oid: Oid) -> Result<WalkStream<T>>
675 where
676 T: 'static,
677 {
678 let ordering = self.inner.config.oid_ordering;
679 let max_results = self.inner.config.max_walk_results;
680 let walk_mode = self.inner.config.walk_mode;
681 let max_repetitions = self.inner.config.max_repetitions as i32;
682 let version = self.inner.config.version;
683
684 WalkStream::new(
685 self.clone(),
686 oid,
687 version,
688 walk_mode,
689 ordering,
690 max_results,
691 max_repetitions,
692 )
693 }
694
695 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
718 pub fn walk_getnext(&self, oid: Oid) -> Walk<T>
719 where
720 T: 'static,
721 {
722 let ordering = self.inner.config.oid_ordering;
723 let max_results = self.inner.config.max_walk_results;
724 Walk::new(self.clone(), oid, ordering, max_results)
725 }
726
727 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid, snmp.max_repetitions = max_repetitions))]
753 pub fn bulk_walk(&self, oid: Oid, max_repetitions: i32) -> BulkWalk<T>
754 where
755 T: 'static,
756 {
757 let ordering = self.inner.config.oid_ordering;
758 let max_results = self.inner.config.max_walk_results;
759 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
760 }
761
762 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
780 pub fn bulk_walk_default(&self, oid: Oid) -> BulkWalk<T>
781 where
782 T: 'static,
783 {
784 let ordering = self.inner.config.oid_ordering;
785 let max_results = self.inner.config.max_walk_results;
786 let max_repetitions = self.inner.config.max_repetitions as i32;
787 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
788 }
789}