1mod auth;
4mod builder;
5mod retry;
6mod v3;
7mod walk;
8
9pub use auth::{Auth, CommunityVersion, UsmAuth, UsmBuilder};
10pub use builder::ClientBuilder;
11pub use retry::{Backoff, Retry, RetryBuilder};
12
13impl Client<UdpHandle> {
15 pub fn builder(target: impl Into<String>, auth: impl Into<Auth>) -> ClientBuilder {
41 ClientBuilder::new(target, auth)
42 }
43}
44use crate::error::internal::DecodeErrorKind;
45use crate::error::{Error, Result};
46use crate::message::{CommunityMessage, Message};
47use crate::oid::Oid;
48use crate::pdu::{GetBulkPdu, Pdu};
49use crate::transport::Transport;
50use crate::transport::UdpHandle;
51use crate::v3::{EngineCache, EngineState, SaltCounter};
52use crate::value::Value;
53use crate::varbind::VarBind;
54use crate::version::Version;
55use bytes::Bytes;
56use std::net::SocketAddr;
57use std::sync::Arc;
58use std::sync::RwLock;
59use std::time::{Duration, Instant};
60use tracing::{Span, instrument};
61
62pub use crate::notification::{DerivedKeys, UsmConfig};
63pub use walk::{BulkWalk, OidOrdering, Walk, WalkMode, WalkStream};
64
65pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
71
72pub const DEFAULT_MAX_OIDS_PER_REQUEST: usize = 10;
77
78pub const DEFAULT_MAX_REPETITIONS: u32 = 25;
82
83#[derive(Clone)]
87pub struct Client<T: Transport = UdpHandle> {
88 inner: Arc<ClientInner<T>>,
89}
90
91struct ClientInner<T: Transport> {
92 transport: T,
93 config: ClientConfig,
94 engine_state: RwLock<Option<EngineState>>,
96 derived_keys: RwLock<Option<DerivedKeys>>,
98 salt_counter: SaltCounter,
100 engine_cache: Option<Arc<EngineCache>>,
102}
103
104#[derive(Clone)]
108pub struct ClientConfig {
109 pub version: Version,
111 pub community: Bytes,
113 pub timeout: Duration,
115 pub retry: Retry,
117 pub max_oids_per_request: usize,
119 pub v3_security: Option<UsmConfig>,
121 pub walk_mode: WalkMode,
123 pub oid_ordering: OidOrdering,
125 pub max_walk_results: Option<usize>,
127 pub max_repetitions: u32,
129}
130
131impl Default for ClientConfig {
132 fn default() -> Self {
136 Self {
137 version: Version::V2c,
138 community: Bytes::from_static(b"public"),
139 timeout: DEFAULT_TIMEOUT,
140 retry: Retry::default(),
141 max_oids_per_request: DEFAULT_MAX_OIDS_PER_REQUEST,
142 v3_security: None,
143 walk_mode: WalkMode::Auto,
144 oid_ordering: OidOrdering::Strict,
145 max_walk_results: None,
146 max_repetitions: DEFAULT_MAX_REPETITIONS,
147 }
148 }
149}
150
151impl<T: Transport> Client<T> {
152 pub fn new(transport: T, config: ClientConfig) -> Self {
159 Self {
160 inner: Arc::new(ClientInner {
161 transport,
162 config,
163 engine_state: RwLock::new(None),
164 derived_keys: RwLock::new(None),
165 salt_counter: SaltCounter::new(),
166 engine_cache: None,
167 }),
168 }
169 }
170
171 pub fn with_engine_cache(
173 transport: T,
174 config: ClientConfig,
175 engine_cache: Arc<EngineCache>,
176 ) -> Self {
177 Self {
178 inner: Arc::new(ClientInner {
179 transport,
180 config,
181 engine_state: RwLock::new(None),
182 derived_keys: RwLock::new(None),
183 salt_counter: SaltCounter::new(),
184 engine_cache: Some(engine_cache),
185 }),
186 }
187 }
188
189 pub fn peer_addr(&self) -> SocketAddr {
194 self.inner.transport.peer_addr()
195 }
196
197 fn next_request_id(&self) -> i32 {
201 self.inner.transport.alloc_request_id()
202 }
203
204 fn is_v3(&self) -> bool {
206 self.inner.config.version == Version::V3 && self.inner.config.v3_security.is_some()
207 }
208
209 #[instrument(
211 level = "debug",
212 skip(self, data),
213 fields(
214 snmp.target = %self.peer_addr(),
215 snmp.request_id = request_id,
216 snmp.attempt = tracing::field::Empty,
217 snmp.elapsed_ms = tracing::field::Empty,
218 )
219 )]
220 async fn send_and_recv(&self, request_id: i32, data: &[u8]) -> Result<Pdu> {
221 let start = Instant::now();
222 let mut last_error: Option<Box<Error>> = None;
223 let max_attempts = if self.inner.transport.is_reliable() {
224 0
225 } else {
226 self.inner.config.retry.max_attempts
227 };
228
229 for attempt in 0..=max_attempts {
230 Span::current().record("snmp.attempt", attempt);
231 if attempt > 0 {
232 tracing::debug!(target: "async_snmp::client", "retrying request");
233 }
234
235 self.inner
237 .transport
238 .register_request(request_id, self.inner.config.timeout);
239
240 tracing::trace!(target: "async_snmp::client", { snmp.bytes = data.len() }, "sending request");
242 self.inner.transport.send(data).await?;
243
244 match self.inner.transport.recv(request_id).await {
246 Ok((response_data, _source)) => {
247 tracing::trace!(target: "async_snmp::client", { snmp.bytes = response_data.len() }, "received response");
248
249 let response = Message::decode(response_data)?;
251
252 let response_version = response.version();
254 let expected_version = self.inner.config.version;
255 if response_version != expected_version {
256 tracing::warn!(target: "async_snmp::client", { ?expected_version, ?response_version, peer = %self.peer_addr() }, "version mismatch in response");
257 return Err(Error::MalformedResponse {
258 target: self.peer_addr(),
259 }
260 .boxed());
261 }
262
263 let response_pdu = response.into_pdu();
264
265 if response_pdu.request_id != request_id {
267 tracing::warn!(target: "async_snmp::client", { expected_request_id = request_id, actual_request_id = response_pdu.request_id, peer = %self.peer_addr() }, "request ID mismatch in response");
268 return Err(Error::MalformedResponse {
269 target: self.peer_addr(),
270 }
271 .boxed());
272 }
273
274 if response_pdu.is_error() {
276 let status = response_pdu.error_status_enum();
277 let oid = (response_pdu.error_index as usize)
279 .checked_sub(1)
280 .and_then(|idx| response_pdu.varbinds.get(idx))
281 .map(|vb| vb.oid.clone());
282
283 Span::current()
284 .record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
285 return Err(Error::Snmp {
286 target: self.peer_addr(),
287 status,
288 index: response_pdu.error_index.max(0) as u32,
289 oid,
290 }
291 .boxed());
292 }
293
294 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
295 return Ok(response_pdu);
296 }
297 Err(e) if matches!(*e, Error::Timeout { .. }) => {
298 last_error = Some(e);
299 if attempt < max_attempts {
301 let delay = self.inner.config.retry.compute_delay(attempt);
302 if !delay.is_zero() {
303 tracing::debug!(target: "async_snmp::client", { delay_ms = delay.as_millis() as u64 }, "backing off");
304 tokio::time::sleep(delay).await;
305 }
306 }
307 continue;
308 }
309 Err(e) => {
310 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
311 return Err(e);
312 }
313 }
314 }
315
316 let elapsed = start.elapsed();
318 Span::current().record("snmp.elapsed_ms", elapsed.as_millis() as u64);
319 tracing::debug!(target: "async_snmp::client", { request_id, peer = %self.peer_addr(), ?elapsed, retries = max_attempts }, "request timed out");
320 Err(last_error.unwrap_or_else(|| {
321 Error::Timeout {
322 target: self.peer_addr(),
323 elapsed,
324 retries: max_attempts,
325 }
326 .boxed()
327 }))
328 }
329
330 async fn send_request(&self, pdu: Pdu) -> Result<Pdu> {
332 if self.is_v3() {
334 return self.send_v3_and_recv(pdu).await;
335 }
336
337 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?pdu.pdu_type, snmp.varbind_count = pdu.varbinds.len() }, "sending {} request", pdu.pdu_type);
338
339 let request_id = pdu.request_id;
340 let message = CommunityMessage::new(
341 self.inner.config.version,
342 self.inner.config.community.clone(),
343 pdu,
344 );
345 let data = message.encode();
346 let response = self.send_and_recv(request_id, &data).await?;
347
348 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
349
350 Ok(response)
351 }
352
353 async fn send_bulk_request(&self, pdu: GetBulkPdu) -> Result<Pdu> {
355 if self.is_v3() {
357 let pdu = Pdu::get_bulk(
359 pdu.request_id,
360 pdu.non_repeaters,
361 pdu.max_repetitions,
362 pdu.varbinds,
363 );
364 return self.send_v3_and_recv(pdu).await;
365 }
366
367 tracing::debug!(target: "async_snmp::client", { snmp.non_repeaters = pdu.non_repeaters, snmp.max_repetitions = pdu.max_repetitions, snmp.varbind_count = pdu.varbinds.len() }, "sending GetBulkRequest");
368
369 let request_id = pdu.request_id;
370 let data = CommunityMessage::encode_bulk(
371 self.inner.config.version,
372 self.inner.config.community.clone(),
373 &pdu,
374 );
375 let response = self.send_and_recv(request_id, &data).await?;
376
377 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
378
379 Ok(response)
380 }
381
382 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
384 pub async fn get(&self, oid: &Oid) -> Result<VarBind> {
385 let request_id = self.next_request_id();
386 let pdu = Pdu::get_request(request_id, std::slice::from_ref(oid));
387 let response = self.send_request(pdu).await?;
388
389 response.varbinds.into_iter().next().ok_or_else(|| {
390 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GET response");
391 Error::MalformedResponse {
392 target: self.peer_addr(),
393 }
394 .boxed()
395 })
396 }
397
398 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
419 pub async fn get_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
420 if oids.is_empty() {
421 return Ok(Vec::new());
422 }
423
424 let max_per_request = self.inner.config.max_oids_per_request;
425
426 if oids.len() <= max_per_request {
428 let request_id = self.next_request_id();
429 let pdu = Pdu::get_request(request_id, oids);
430 let response = self.send_request(pdu).await?;
431 return Ok(response.varbinds);
432 }
433
434 let num_batches = oids.len().div_ceil(max_per_request);
436 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GET request into batches");
437
438 let mut all_results = Vec::with_capacity(oids.len());
439
440 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
441 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GET batch");
442 let request_id = self.next_request_id();
443 let pdu = Pdu::get_request(request_id, chunk);
444 let response = self.send_request(pdu).await?;
445 all_results.extend(response.varbinds);
446 }
447
448 Ok(all_results)
449 }
450
451 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
453 pub async fn get_next(&self, oid: &Oid) -> Result<VarBind> {
454 let request_id = self.next_request_id();
455 let pdu = Pdu::get_next_request(request_id, std::slice::from_ref(oid));
456 let response = self.send_request(pdu).await?;
457
458 response.varbinds.into_iter().next().ok_or_else(|| {
459 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GETNEXT response");
460 Error::MalformedResponse {
461 target: self.peer_addr(),
462 }
463 .boxed()
464 })
465 }
466
467 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
487 pub async fn get_next_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
488 if oids.is_empty() {
489 return Ok(Vec::new());
490 }
491
492 let max_per_request = self.inner.config.max_oids_per_request;
493
494 if oids.len() <= max_per_request {
496 let request_id = self.next_request_id();
497 let pdu = Pdu::get_next_request(request_id, oids);
498 let response = self.send_request(pdu).await?;
499 return Ok(response.varbinds);
500 }
501
502 let num_batches = oids.len().div_ceil(max_per_request);
504 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GETNEXT request into batches");
505
506 let mut all_results = Vec::with_capacity(oids.len());
507
508 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
509 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GETNEXT batch");
510 let request_id = self.next_request_id();
511 let pdu = Pdu::get_next_request(request_id, chunk);
512 let response = self.send_request(pdu).await?;
513 all_results.extend(response.varbinds);
514 }
515
516 Ok(all_results)
517 }
518
519 #[instrument(skip(self, value), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
521 pub async fn set(&self, oid: &Oid, value: Value) -> Result<VarBind> {
522 let request_id = self.next_request_id();
523 let varbind = VarBind::new(oid.clone(), value);
524 let pdu = Pdu::set_request(request_id, vec![varbind]);
525 let response = self.send_request(pdu).await?;
526
527 response.varbinds.into_iter().next().ok_or_else(|| {
528 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty SET response");
529 Error::MalformedResponse {
530 target: self.peer_addr(),
531 }
532 .boxed()
533 })
534 }
535
536 #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = varbinds.len()))]
556 pub async fn set_many(&self, varbinds: &[(Oid, Value)]) -> Result<Vec<VarBind>> {
557 if varbinds.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let max_per_request = self.inner.config.max_oids_per_request;
562
563 if varbinds.len() <= max_per_request {
565 let request_id = self.next_request_id();
566 let vbs: Vec<VarBind> = varbinds
567 .iter()
568 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
569 .collect();
570 let pdu = Pdu::set_request(request_id, vbs);
571 let response = self.send_request(pdu).await?;
572 return Ok(response.varbinds);
573 }
574
575 let num_batches = varbinds.len().div_ceil(max_per_request);
577 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = varbinds.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting SET request into batches");
578
579 let mut all_results = Vec::with_capacity(varbinds.len());
580
581 for (batch_idx, chunk) in varbinds.chunks(max_per_request).enumerate() {
582 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending SET batch");
583 let request_id = self.next_request_id();
584 let vbs: Vec<VarBind> = chunk
585 .iter()
586 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
587 .collect();
588 let pdu = Pdu::set_request(request_id, vbs);
589 let response = self.send_request(pdu).await?;
590 all_results.extend(response.varbinds);
591 }
592
593 Ok(all_results)
594 }
595
596 #[instrument(skip(self, oids), err, fields(
629 snmp.target = %self.peer_addr(),
630 snmp.oid_count = oids.len(),
631 snmp.non_repeaters = non_repeaters,
632 snmp.max_repetitions = max_repetitions
633 ))]
634 pub async fn get_bulk(
635 &self,
636 oids: &[Oid],
637 non_repeaters: i32,
638 max_repetitions: i32,
639 ) -> Result<Vec<VarBind>> {
640 let request_id = self.next_request_id();
641 let pdu = GetBulkPdu::new(request_id, non_repeaters, max_repetitions, oids);
642 let response = self.send_bulk_request(pdu).await?;
643 Ok(response.varbinds)
644 }
645
646 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
672 pub fn walk(&self, oid: Oid) -> Result<WalkStream<T>>
673 where
674 T: 'static,
675 {
676 let ordering = self.inner.config.oid_ordering;
677 let max_results = self.inner.config.max_walk_results;
678 let walk_mode = self.inner.config.walk_mode;
679 let max_repetitions = self.inner.config.max_repetitions as i32;
680 let version = self.inner.config.version;
681
682 WalkStream::new(
683 self.clone(),
684 oid,
685 version,
686 walk_mode,
687 ordering,
688 max_results,
689 max_repetitions,
690 )
691 }
692
693 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
716 pub fn walk_getnext(&self, oid: Oid) -> Walk<T>
717 where
718 T: 'static,
719 {
720 let ordering = self.inner.config.oid_ordering;
721 let max_results = self.inner.config.max_walk_results;
722 Walk::new(self.clone(), oid, ordering, max_results)
723 }
724
725 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid, snmp.max_repetitions = max_repetitions))]
751 pub fn bulk_walk(&self, oid: Oid, max_repetitions: i32) -> BulkWalk<T>
752 where
753 T: 'static,
754 {
755 let ordering = self.inner.config.oid_ordering;
756 let max_results = self.inner.config.max_walk_results;
757 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
758 }
759
760 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
778 pub fn bulk_walk_default(&self, oid: Oid) -> BulkWalk<T>
779 where
780 T: 'static,
781 {
782 let ordering = self.inner.config.oid_ordering;
783 let max_results = self.inner.config.max_walk_results;
784 let max_repetitions = self.inner.config.max_repetitions as i32;
785 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
786 }
787}