1mod auth;
4mod builder;
5mod retry;
6mod v3;
7mod walk;
8
9pub use auth::{Auth, CommunityVersion, UsmAuth, UsmBuilder};
10pub use builder::ClientBuilder;
11pub use retry::{Backoff, Retry, RetryBuilder};
12
13impl Client<UdpHandle> {
15 pub fn builder(target: impl Into<String>, auth: impl Into<Auth>) -> ClientBuilder {
41 ClientBuilder::new(target, auth)
42 }
43}
44use crate::error::internal::DecodeErrorKind;
45use crate::error::{Error, Result};
46use crate::message::{CommunityMessage, Message};
47use crate::oid::Oid;
48use crate::pdu::{GetBulkPdu, Pdu};
49use crate::transport::Transport;
50use crate::transport::UdpHandle;
51use crate::v3::{EngineCache, EngineState, SaltCounter};
52use crate::value::Value;
53use crate::varbind::VarBind;
54use crate::version::Version;
55use bytes::Bytes;
56use std::net::SocketAddr;
57use std::sync::Arc;
58use std::sync::RwLock;
59use std::time::{Duration, Instant};
60use tracing::{Span, instrument};
61
62pub use v3::{V3DerivedKeys, V3SecurityConfig};
63pub use walk::{BulkWalk, OidOrdering, Walk, WalkMode, WalkStream};
64
65#[derive(Clone)]
69pub struct Client<T: Transport = UdpHandle> {
70 inner: Arc<ClientInner<T>>,
71}
72
73struct ClientInner<T: Transport> {
74 transport: T,
75 config: ClientConfig,
76 engine_state: RwLock<Option<EngineState>>,
78 derived_keys: RwLock<Option<V3DerivedKeys>>,
80 salt_counter: SaltCounter,
82 engine_cache: Option<Arc<EngineCache>>,
84}
85
86#[derive(Clone)]
90pub struct ClientConfig {
91 pub version: Version,
93 pub community: Bytes,
95 pub timeout: Duration,
97 pub retry: Retry,
99 pub max_oids_per_request: usize,
101 pub v3_security: Option<V3SecurityConfig>,
103 pub walk_mode: WalkMode,
105 pub oid_ordering: OidOrdering,
107 pub max_walk_results: Option<usize>,
109 pub max_repetitions: u32,
111}
112
113impl Default for ClientConfig {
114 fn default() -> Self {
118 Self {
119 version: Version::V2c,
120 community: Bytes::from_static(b"public"),
121 timeout: Duration::from_secs(5),
122 retry: Retry::default(),
123 max_oids_per_request: 10,
124 v3_security: None,
125 walk_mode: WalkMode::Auto,
126 oid_ordering: OidOrdering::Strict,
127 max_walk_results: None,
128 max_repetitions: 25,
129 }
130 }
131}
132
133impl<T: Transport> Client<T> {
134 pub fn new(transport: T, config: ClientConfig) -> Self {
136 Self {
137 inner: Arc::new(ClientInner {
138 transport,
139 config,
140 engine_state: RwLock::new(None),
141 derived_keys: RwLock::new(None),
142 salt_counter: SaltCounter::new(),
143 engine_cache: None,
144 }),
145 }
146 }
147
148 pub fn with_engine_cache(
150 transport: T,
151 config: ClientConfig,
152 engine_cache: Arc<EngineCache>,
153 ) -> Self {
154 Self {
155 inner: Arc::new(ClientInner {
156 transport,
157 config,
158 engine_state: RwLock::new(None),
159 derived_keys: RwLock::new(None),
160 salt_counter: SaltCounter::new(),
161 engine_cache: Some(engine_cache),
162 }),
163 }
164 }
165
166 pub fn peer_addr(&self) -> SocketAddr {
171 self.inner.transport.peer_addr()
172 }
173
174 fn next_request_id(&self) -> i32 {
178 self.inner.transport.alloc_request_id()
179 }
180
181 fn is_v3(&self) -> bool {
183 self.inner.config.version == Version::V3 && self.inner.config.v3_security.is_some()
184 }
185
186 #[instrument(
188 level = "debug",
189 skip(self, data),
190 fields(
191 snmp.target = %self.peer_addr(),
192 snmp.request_id = request_id,
193 snmp.attempt = tracing::field::Empty,
194 snmp.elapsed_ms = tracing::field::Empty,
195 )
196 )]
197 async fn send_and_recv(&self, request_id: i32, data: &[u8]) -> Result<Pdu> {
198 let start = Instant::now();
199 let mut last_error: Option<Box<Error>> = None;
200 let max_attempts = if self.inner.transport.is_reliable() {
201 0
202 } else {
203 self.inner.config.retry.max_attempts
204 };
205
206 for attempt in 0..=max_attempts {
207 Span::current().record("snmp.attempt", attempt);
208 if attempt > 0 {
209 tracing::debug!(target: "async_snmp::client", "retrying request");
210 }
211
212 self.inner
214 .transport
215 .register_request(request_id, self.inner.config.timeout);
216
217 tracing::trace!(target: "async_snmp::client", { snmp.bytes = data.len() }, "sending request");
219 self.inner.transport.send(data).await?;
220
221 match self.inner.transport.recv(request_id).await {
223 Ok((response_data, _source)) => {
224 tracing::trace!(target: "async_snmp::client", { snmp.bytes = response_data.len() }, "received response");
225
226 let response = Message::decode(response_data)?;
228
229 let response_version = response.version();
231 let expected_version = self.inner.config.version;
232 if response_version != expected_version {
233 tracing::warn!(target: "async_snmp::client", { ?expected_version, ?response_version, peer = %self.peer_addr() }, "version mismatch in response");
234 return Err(Error::MalformedResponse {
235 target: self.peer_addr(),
236 }
237 .boxed());
238 }
239
240 let response_pdu = response.into_pdu();
241
242 if response_pdu.request_id != request_id {
244 tracing::warn!(target: "async_snmp::client", { expected_request_id = request_id, actual_request_id = response_pdu.request_id, peer = %self.peer_addr() }, "request ID mismatch in response");
245 return Err(Error::MalformedResponse {
246 target: self.peer_addr(),
247 }
248 .boxed());
249 }
250
251 if response_pdu.is_error() {
253 let status = response_pdu.error_status_enum();
254 let oid = (response_pdu.error_index as usize)
256 .checked_sub(1)
257 .and_then(|idx| response_pdu.varbinds.get(idx))
258 .map(|vb| vb.oid.clone());
259
260 Span::current()
261 .record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
262 return Err(Error::Snmp {
263 target: self.peer_addr(),
264 status,
265 index: response_pdu.error_index.max(0) as u32,
266 oid,
267 }
268 .boxed());
269 }
270
271 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
272 return Ok(response_pdu);
273 }
274 Err(e) if matches!(*e, Error::Timeout { .. }) => {
275 last_error = Some(e);
276 if attempt < max_attempts {
278 let delay = self.inner.config.retry.compute_delay(attempt);
279 if !delay.is_zero() {
280 tracing::debug!(target: "async_snmp::client", { delay_ms = delay.as_millis() as u64 }, "backing off");
281 tokio::time::sleep(delay).await;
282 }
283 }
284 continue;
285 }
286 Err(e) => {
287 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
288 return Err(e);
289 }
290 }
291 }
292
293 let elapsed = start.elapsed();
295 Span::current().record("snmp.elapsed_ms", elapsed.as_millis() as u64);
296 tracing::debug!(target: "async_snmp::client", { request_id, peer = %self.peer_addr(), ?elapsed, retries = max_attempts }, "request timed out");
297 Err(last_error.unwrap_or_else(|| {
298 Error::Timeout {
299 target: self.peer_addr(),
300 elapsed,
301 retries: max_attempts,
302 }
303 .boxed()
304 }))
305 }
306
307 async fn send_request(&self, pdu: Pdu) -> Result<Pdu> {
309 if self.is_v3() {
311 return self.send_v3_and_recv(pdu).await;
312 }
313
314 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?pdu.pdu_type, snmp.varbind_count = pdu.varbinds.len() }, "sending {} request", pdu.pdu_type);
315
316 let request_id = pdu.request_id;
317 let message = CommunityMessage::new(
318 self.inner.config.version,
319 self.inner.config.community.clone(),
320 pdu,
321 );
322 let data = message.encode();
323 let response = self.send_and_recv(request_id, &data).await?;
324
325 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
326
327 Ok(response)
328 }
329
330 async fn send_bulk_request(&self, pdu: GetBulkPdu) -> Result<Pdu> {
332 if self.is_v3() {
334 let pdu = Pdu::get_bulk(
336 pdu.request_id,
337 pdu.non_repeaters,
338 pdu.max_repetitions,
339 pdu.varbinds,
340 );
341 return self.send_v3_and_recv(pdu).await;
342 }
343
344 tracing::debug!(target: "async_snmp::client", { snmp.non_repeaters = pdu.non_repeaters, snmp.max_repetitions = pdu.max_repetitions, snmp.varbind_count = pdu.varbinds.len() }, "sending GetBulkRequest");
345
346 let request_id = pdu.request_id;
347 let data = CommunityMessage::encode_bulk(
348 self.inner.config.version,
349 self.inner.config.community.clone(),
350 &pdu,
351 );
352 let response = self.send_and_recv(request_id, &data).await?;
353
354 tracing::debug!(target: "async_snmp::client", { snmp.pdu_type = ?response.pdu_type, snmp.varbind_count = response.varbinds.len(), snmp.error_status = response.error_status, snmp.error_index = response.error_index }, "received {} response", response.pdu_type);
355
356 Ok(response)
357 }
358
359 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
361 pub async fn get(&self, oid: &Oid) -> Result<VarBind> {
362 let request_id = self.next_request_id();
363 let pdu = Pdu::get_request(request_id, std::slice::from_ref(oid));
364 let response = self.send_request(pdu).await?;
365
366 response.varbinds.into_iter().next().ok_or_else(|| {
367 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GET response");
368 Error::MalformedResponse {
369 target: self.peer_addr(),
370 }
371 .boxed()
372 })
373 }
374
375 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
396 pub async fn get_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
397 if oids.is_empty() {
398 return Ok(Vec::new());
399 }
400
401 let max_per_request = self.inner.config.max_oids_per_request;
402
403 if oids.len() <= max_per_request {
405 let request_id = self.next_request_id();
406 let pdu = Pdu::get_request(request_id, oids);
407 let response = self.send_request(pdu).await?;
408 return Ok(response.varbinds);
409 }
410
411 let num_batches = oids.len().div_ceil(max_per_request);
413 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GET request into batches");
414
415 let mut all_results = Vec::with_capacity(oids.len());
416
417 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
418 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GET batch");
419 let request_id = self.next_request_id();
420 let pdu = Pdu::get_request(request_id, chunk);
421 let response = self.send_request(pdu).await?;
422 all_results.extend(response.varbinds);
423 }
424
425 Ok(all_results)
426 }
427
428 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
430 pub async fn get_next(&self, oid: &Oid) -> Result<VarBind> {
431 let request_id = self.next_request_id();
432 let pdu = Pdu::get_next_request(request_id, std::slice::from_ref(oid));
433 let response = self.send_request(pdu).await?;
434
435 response.varbinds.into_iter().next().ok_or_else(|| {
436 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty GETNEXT response");
437 Error::MalformedResponse {
438 target: self.peer_addr(),
439 }
440 .boxed()
441 })
442 }
443
444 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
464 pub async fn get_next_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
465 if oids.is_empty() {
466 return Ok(Vec::new());
467 }
468
469 let max_per_request = self.inner.config.max_oids_per_request;
470
471 if oids.len() <= max_per_request {
473 let request_id = self.next_request_id();
474 let pdu = Pdu::get_next_request(request_id, oids);
475 let response = self.send_request(pdu).await?;
476 return Ok(response.varbinds);
477 }
478
479 let num_batches = oids.len().div_ceil(max_per_request);
481 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = oids.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting GETNEXT request into batches");
482
483 let mut all_results = Vec::with_capacity(oids.len());
484
485 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
486 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending GETNEXT batch");
487 let request_id = self.next_request_id();
488 let pdu = Pdu::get_next_request(request_id, chunk);
489 let response = self.send_request(pdu).await?;
490 all_results.extend(response.varbinds);
491 }
492
493 Ok(all_results)
494 }
495
496 #[instrument(skip(self, value), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
498 pub async fn set(&self, oid: &Oid, value: Value) -> Result<VarBind> {
499 let request_id = self.next_request_id();
500 let varbind = VarBind::new(oid.clone(), value);
501 let pdu = Pdu::set_request(request_id, vec![varbind]);
502 let response = self.send_request(pdu).await?;
503
504 response.varbinds.into_iter().next().ok_or_else(|| {
505 tracing::debug!(target: "async_snmp::client", { peer = %self.peer_addr(), kind = %DecodeErrorKind::EmptyResponse }, "empty SET response");
506 Error::MalformedResponse {
507 target: self.peer_addr(),
508 }
509 .boxed()
510 })
511 }
512
513 #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = varbinds.len()))]
533 pub async fn set_many(&self, varbinds: &[(Oid, Value)]) -> Result<Vec<VarBind>> {
534 if varbinds.is_empty() {
535 return Ok(Vec::new());
536 }
537
538 let max_per_request = self.inner.config.max_oids_per_request;
539
540 if varbinds.len() <= max_per_request {
542 let request_id = self.next_request_id();
543 let vbs: Vec<VarBind> = varbinds
544 .iter()
545 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
546 .collect();
547 let pdu = Pdu::set_request(request_id, vbs);
548 let response = self.send_request(pdu).await?;
549 return Ok(response.varbinds);
550 }
551
552 let num_batches = varbinds.len().div_ceil(max_per_request);
554 tracing::debug!(target: "async_snmp::client", { snmp.oid_count = varbinds.len(), snmp.max_per_request = max_per_request, snmp.batch_count = num_batches }, "splitting SET request into batches");
555
556 let mut all_results = Vec::with_capacity(varbinds.len());
557
558 for (batch_idx, chunk) in varbinds.chunks(max_per_request).enumerate() {
559 tracing::debug!(target: "async_snmp::client", { snmp.batch = batch_idx + 1, snmp.batch_total = num_batches, snmp.batch_oid_count = chunk.len() }, "sending SET batch");
560 let request_id = self.next_request_id();
561 let vbs: Vec<VarBind> = chunk
562 .iter()
563 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
564 .collect();
565 let pdu = Pdu::set_request(request_id, vbs);
566 let response = self.send_request(pdu).await?;
567 all_results.extend(response.varbinds);
568 }
569
570 Ok(all_results)
571 }
572
573 #[instrument(skip(self, oids), err, fields(
606 snmp.target = %self.peer_addr(),
607 snmp.oid_count = oids.len(),
608 snmp.non_repeaters = non_repeaters,
609 snmp.max_repetitions = max_repetitions
610 ))]
611 pub async fn get_bulk(
612 &self,
613 oids: &[Oid],
614 non_repeaters: i32,
615 max_repetitions: i32,
616 ) -> Result<Vec<VarBind>> {
617 let request_id = self.next_request_id();
618 let pdu = GetBulkPdu::new(request_id, non_repeaters, max_repetitions, oids);
619 let response = self.send_bulk_request(pdu).await?;
620 Ok(response.varbinds)
621 }
622
623 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
649 pub fn walk(&self, oid: Oid) -> Result<WalkStream<T>>
650 where
651 T: 'static,
652 {
653 let ordering = self.inner.config.oid_ordering;
654 let max_results = self.inner.config.max_walk_results;
655 let walk_mode = self.inner.config.walk_mode;
656 let max_repetitions = self.inner.config.max_repetitions as i32;
657 let version = self.inner.config.version;
658
659 WalkStream::new(
660 self.clone(),
661 oid,
662 version,
663 walk_mode,
664 ordering,
665 max_results,
666 max_repetitions,
667 )
668 }
669
670 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
693 pub fn walk_getnext(&self, oid: Oid) -> Walk<T>
694 where
695 T: 'static,
696 {
697 let ordering = self.inner.config.oid_ordering;
698 let max_results = self.inner.config.max_walk_results;
699 Walk::new(self.clone(), oid, ordering, max_results)
700 }
701
702 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid, snmp.max_repetitions = max_repetitions))]
728 pub fn bulk_walk(&self, oid: Oid, max_repetitions: i32) -> BulkWalk<T>
729 where
730 T: 'static,
731 {
732 let ordering = self.inner.config.oid_ordering;
733 let max_results = self.inner.config.max_walk_results;
734 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
735 }
736
737 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
755 pub fn bulk_walk_default(&self, oid: Oid) -> BulkWalk<T>
756 where
757 T: 'static,
758 {
759 let ordering = self.inner.config.oid_ordering;
760 let max_results = self.inner.config.max_walk_results;
761 let max_repetitions = self.inner.config.max_repetitions as i32;
762 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
763 }
764}