1mod auth;
4mod builder;
5mod v3;
6mod walk;
7
8pub use auth::{Auth, CommunityVersion, UsmAuth, UsmBuilder};
9pub use builder::ClientBuilder;
10
11impl Client<UdpTransport> {
13 pub fn builder(target: impl Into<String>, auth: impl Into<Auth>) -> ClientBuilder {
39 ClientBuilder::new(target, auth)
40 }
41}
42use crate::error::{DecodeErrorKind, Error, Result};
43use crate::message::{CommunityMessage, Message};
44use crate::oid::Oid;
45use crate::pdu::{GetBulkPdu, Pdu};
46use crate::transport::Transport;
47use crate::transport::UdpTransport;
48use crate::v3::{EngineCache, EngineState, SaltCounter};
49use crate::value::Value;
50use crate::varbind::VarBind;
51use crate::version::Version;
52use bytes::Bytes;
53use std::net::SocketAddr;
54use std::sync::Arc;
55use std::sync::RwLock;
56use std::sync::atomic::{AtomicI32, Ordering};
57use std::time::{Duration, Instant};
58use tracing::{Span, instrument};
59
60pub use v3::{V3DerivedKeys, V3SecurityConfig};
61pub use walk::{BulkWalk, OidOrdering, Walk, WalkMode, WalkStream};
62
63#[derive(Clone)]
67pub struct Client<T: Transport = UdpTransport> {
68 inner: Arc<ClientInner<T>>,
69}
70
71struct ClientInner<T: Transport> {
72 transport: T,
73 config: ClientConfig,
74 request_id: AtomicI32,
75 engine_state: RwLock<Option<EngineState>>,
77 derived_keys: RwLock<Option<V3DerivedKeys>>,
79 salt_counter: SaltCounter,
81 engine_cache: Option<Arc<EngineCache>>,
83}
84
85#[derive(Clone)]
87pub struct ClientConfig {
88 pub version: Version,
90 pub community: Bytes,
92 pub timeout: Duration,
94 pub retries: u32,
96 pub max_oids_per_request: usize,
98 pub v3_security: Option<V3SecurityConfig>,
100 pub walk_mode: WalkMode,
102 pub oid_ordering: OidOrdering,
104 pub max_walk_results: Option<usize>,
106 pub max_repetitions: u32,
108}
109
110impl Default for ClientConfig {
111 fn default() -> Self {
112 Self {
113 version: Version::V2c,
114 community: Bytes::from_static(b"public"),
115 timeout: Duration::from_secs(5),
116 retries: 3,
117 max_oids_per_request: 10,
118 v3_security: None,
119 walk_mode: WalkMode::Auto,
120 oid_ordering: OidOrdering::Strict,
121 max_walk_results: None,
122 max_repetitions: 25,
123 }
124 }
125}
126
127impl<T: Transport> Client<T> {
128 pub fn new(transport: T, config: ClientConfig) -> Self {
130 Self {
131 inner: Arc::new(ClientInner {
132 transport,
133 config,
134 request_id: AtomicI32::new(1),
135 engine_state: RwLock::new(None),
136 derived_keys: RwLock::new(None),
137 salt_counter: SaltCounter::new(),
138 engine_cache: None,
139 }),
140 }
141 }
142
143 pub fn with_engine_cache(
145 transport: T,
146 config: ClientConfig,
147 engine_cache: Arc<EngineCache>,
148 ) -> Self {
149 Self {
150 inner: Arc::new(ClientInner {
151 transport,
152 config,
153 request_id: AtomicI32::new(1),
154 engine_state: RwLock::new(None),
155 derived_keys: RwLock::new(None),
156 salt_counter: SaltCounter::new(),
157 engine_cache: Some(engine_cache),
158 }),
159 }
160 }
161
162 pub fn peer_addr(&self) -> SocketAddr {
167 self.inner.transport.peer_addr()
168 }
169
170 fn next_request_id(&self) -> i32 {
175 self.inner
176 .transport
177 .alloc_request_id()
178 .unwrap_or_else(|| self.inner.request_id.fetch_add(1, Ordering::Relaxed))
179 }
180
181 fn is_v3(&self) -> bool {
183 self.inner.config.version == Version::V3 && self.inner.config.v3_security.is_some()
184 }
185
186 #[instrument(
188 level = "debug",
189 skip(self, data),
190 fields(
191 snmp.target = %self.peer_addr(),
192 snmp.request_id = request_id,
193 snmp.retries = tracing::field::Empty,
194 snmp.elapsed_ms = tracing::field::Empty,
195 )
196 )]
197 async fn send_and_recv(&self, request_id: i32, data: &[u8]) -> Result<Pdu> {
198 let start = Instant::now();
199 let mut last_error = None;
200 let retries = if self.inner.transport.is_stream() {
201 0
202 } else {
203 self.inner.config.retries
204 };
205
206 for attempt in 0..=retries {
207 Span::current().record("snmp.retries", attempt);
208 if attempt > 0 {
209 tracing::debug!("retrying request");
210 }
211
212 tracing::trace!(snmp.bytes = data.len(), "sending request");
214 self.inner.transport.send(data).await?;
215
216 match self
218 .inner
219 .transport
220 .recv(request_id, self.inner.config.timeout)
221 .await
222 {
223 Ok((response_data, _source)) => {
224 tracing::trace!(snmp.bytes = response_data.len(), "received response");
225
226 let response = Message::decode(response_data)?;
228
229 let response_version = response.version();
231 let expected_version = self.inner.config.version;
232 if response_version != expected_version {
233 return Err(Error::VersionMismatch {
234 expected: expected_version,
235 actual: response_version,
236 });
237 }
238
239 let response_pdu = response.into_pdu();
240
241 if response_pdu.request_id != request_id {
243 return Err(Error::RequestIdMismatch {
244 expected: request_id,
245 actual: response_pdu.request_id,
246 });
247 }
248
249 if response_pdu.is_error() {
251 let status = response_pdu.error_status_enum();
252 let oid = (response_pdu.error_index as usize)
254 .checked_sub(1)
255 .and_then(|idx| response_pdu.varbinds.get(idx))
256 .map(|vb| vb.oid.clone());
257
258 Span::current()
259 .record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
260 return Err(Error::Snmp {
261 target: Some(self.peer_addr()),
262 status,
263 index: response_pdu.error_index as u32,
264 oid,
265 });
266 }
267
268 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
269 return Ok(response_pdu);
270 }
271 Err(e @ Error::Timeout { .. }) => {
272 last_error = Some(e);
273 continue;
274 }
275 Err(e) => {
276 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
277 return Err(e);
278 }
279 }
280 }
281
282 Span::current().record("snmp.elapsed_ms", start.elapsed().as_millis() as u64);
284 Err(last_error.unwrap_or(Error::Timeout {
285 target: Some(self.peer_addr()),
286 elapsed: self.inner.config.timeout * (retries + 1),
287 request_id,
288 retries,
289 }))
290 }
291
292 async fn send_request(&self, pdu: Pdu) -> Result<Pdu> {
294 if self.is_v3() {
296 return self.send_v3_and_recv(pdu).await;
297 }
298
299 tracing::debug!(
300 snmp.pdu_type = ?pdu.pdu_type,
301 snmp.varbind_count = pdu.varbinds.len(),
302 "sending {} request",
303 pdu.pdu_type
304 );
305
306 let request_id = pdu.request_id;
307 let message = CommunityMessage::new(
308 self.inner.config.version,
309 self.inner.config.community.clone(),
310 pdu,
311 );
312 let data = message.encode();
313 let response = self.send_and_recv(request_id, &data).await?;
314
315 tracing::debug!(
316 snmp.pdu_type = ?response.pdu_type,
317 snmp.varbind_count = response.varbinds.len(),
318 snmp.error_status = response.error_status,
319 snmp.error_index = response.error_index,
320 "received {} response",
321 response.pdu_type
322 );
323
324 Ok(response)
325 }
326
327 async fn send_bulk_request(&self, pdu: GetBulkPdu) -> Result<Pdu> {
329 if self.is_v3() {
331 let pdu = Pdu::get_bulk(
333 pdu.request_id,
334 pdu.non_repeaters,
335 pdu.max_repetitions,
336 pdu.varbinds,
337 );
338 return self.send_v3_and_recv(pdu).await;
339 }
340
341 tracing::debug!(
342 snmp.non_repeaters = pdu.non_repeaters,
343 snmp.max_repetitions = pdu.max_repetitions,
344 snmp.varbind_count = pdu.varbinds.len(),
345 "sending GetBulkRequest"
346 );
347
348 let request_id = pdu.request_id;
349 let data = CommunityMessage::encode_bulk(
350 self.inner.config.version,
351 self.inner.config.community.clone(),
352 &pdu,
353 );
354 let response = self.send_and_recv(request_id, &data).await?;
355
356 tracing::debug!(
357 snmp.pdu_type = ?response.pdu_type,
358 snmp.varbind_count = response.varbinds.len(),
359 snmp.error_status = response.error_status,
360 snmp.error_index = response.error_index,
361 "received {} response",
362 response.pdu_type
363 );
364
365 Ok(response)
366 }
367
368 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
370 pub async fn get(&self, oid: &Oid) -> Result<VarBind> {
371 let request_id = self.next_request_id();
372 let pdu = Pdu::get_request(request_id, std::slice::from_ref(oid));
373 let response = self.send_request(pdu).await?;
374
375 response
376 .varbinds
377 .into_iter()
378 .next()
379 .ok_or_else(|| Error::decode(0, DecodeErrorKind::EmptyResponse))
380 }
381
382 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
403 pub async fn get_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
404 if oids.is_empty() {
405 return Ok(Vec::new());
406 }
407
408 let max_per_request = self.inner.config.max_oids_per_request;
409
410 if oids.len() <= max_per_request {
412 let request_id = self.next_request_id();
413 let pdu = Pdu::get_request(request_id, oids);
414 let response = self.send_request(pdu).await?;
415 return Ok(response.varbinds);
416 }
417
418 let num_batches = oids.len().div_ceil(max_per_request);
420 tracing::debug!(
421 snmp.oid_count = oids.len(),
422 snmp.max_per_request = max_per_request,
423 snmp.batch_count = num_batches,
424 "splitting GET request into batches"
425 );
426
427 let mut all_results = Vec::with_capacity(oids.len());
428
429 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
430 tracing::debug!(
431 snmp.batch = batch_idx + 1,
432 snmp.batch_total = num_batches,
433 snmp.batch_oid_count = chunk.len(),
434 "sending GET batch"
435 );
436 let request_id = self.next_request_id();
437 let pdu = Pdu::get_request(request_id, chunk);
438 let response = self.send_request(pdu).await?;
439 all_results.extend(response.varbinds);
440 }
441
442 Ok(all_results)
443 }
444
445 #[instrument(skip(self), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
447 pub async fn get_next(&self, oid: &Oid) -> Result<VarBind> {
448 let request_id = self.next_request_id();
449 let pdu = Pdu::get_next_request(request_id, std::slice::from_ref(oid));
450 let response = self.send_request(pdu).await?;
451
452 response
453 .varbinds
454 .into_iter()
455 .next()
456 .ok_or_else(|| Error::decode(0, DecodeErrorKind::EmptyResponse))
457 }
458
459 #[instrument(skip(self, oids), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = oids.len()))]
479 pub async fn get_next_many(&self, oids: &[Oid]) -> Result<Vec<VarBind>> {
480 if oids.is_empty() {
481 return Ok(Vec::new());
482 }
483
484 let max_per_request = self.inner.config.max_oids_per_request;
485
486 if oids.len() <= max_per_request {
488 let request_id = self.next_request_id();
489 let pdu = Pdu::get_next_request(request_id, oids);
490 let response = self.send_request(pdu).await?;
491 return Ok(response.varbinds);
492 }
493
494 let num_batches = oids.len().div_ceil(max_per_request);
496 tracing::debug!(
497 snmp.oid_count = oids.len(),
498 snmp.max_per_request = max_per_request,
499 snmp.batch_count = num_batches,
500 "splitting GETNEXT request into batches"
501 );
502
503 let mut all_results = Vec::with_capacity(oids.len());
504
505 for (batch_idx, chunk) in oids.chunks(max_per_request).enumerate() {
506 tracing::debug!(
507 snmp.batch = batch_idx + 1,
508 snmp.batch_total = num_batches,
509 snmp.batch_oid_count = chunk.len(),
510 "sending GETNEXT batch"
511 );
512 let request_id = self.next_request_id();
513 let pdu = Pdu::get_next_request(request_id, chunk);
514 let response = self.send_request(pdu).await?;
515 all_results.extend(response.varbinds);
516 }
517
518 Ok(all_results)
519 }
520
521 #[instrument(skip(self, value), err, fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
523 pub async fn set(&self, oid: &Oid, value: Value) -> Result<VarBind> {
524 let request_id = self.next_request_id();
525 let varbind = VarBind::new(oid.clone(), value);
526 let pdu = Pdu::set_request(request_id, vec![varbind]);
527 let response = self.send_request(pdu).await?;
528
529 response
530 .varbinds
531 .into_iter()
532 .next()
533 .ok_or_else(|| Error::decode(0, DecodeErrorKind::EmptyResponse))
534 }
535
536 #[instrument(skip(self, varbinds), err, fields(snmp.target = %self.peer_addr(), snmp.oid_count = varbinds.len()))]
556 pub async fn set_many(&self, varbinds: &[(Oid, Value)]) -> Result<Vec<VarBind>> {
557 if varbinds.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let max_per_request = self.inner.config.max_oids_per_request;
562
563 if varbinds.len() <= max_per_request {
565 let request_id = self.next_request_id();
566 let vbs: Vec<VarBind> = varbinds
567 .iter()
568 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
569 .collect();
570 let pdu = Pdu::set_request(request_id, vbs);
571 let response = self.send_request(pdu).await?;
572 return Ok(response.varbinds);
573 }
574
575 let num_batches = varbinds.len().div_ceil(max_per_request);
577 tracing::debug!(
578 snmp.oid_count = varbinds.len(),
579 snmp.max_per_request = max_per_request,
580 snmp.batch_count = num_batches,
581 "splitting SET request into batches"
582 );
583
584 let mut all_results = Vec::with_capacity(varbinds.len());
585
586 for (batch_idx, chunk) in varbinds.chunks(max_per_request).enumerate() {
587 tracing::debug!(
588 snmp.batch = batch_idx + 1,
589 snmp.batch_total = num_batches,
590 snmp.batch_oid_count = chunk.len(),
591 "sending SET batch"
592 );
593 let request_id = self.next_request_id();
594 let vbs: Vec<VarBind> = chunk
595 .iter()
596 .map(|(oid, value)| VarBind::new(oid.clone(), value.clone()))
597 .collect();
598 let pdu = Pdu::set_request(request_id, vbs);
599 let response = self.send_request(pdu).await?;
600 all_results.extend(response.varbinds);
601 }
602
603 Ok(all_results)
604 }
605
606 #[instrument(skip(self, oids), err, fields(
631 snmp.target = %self.peer_addr(),
632 snmp.oid_count = oids.len(),
633 snmp.non_repeaters = non_repeaters,
634 snmp.max_repetitions = max_repetitions
635 ))]
636 pub async fn get_bulk(
637 &self,
638 oids: &[Oid],
639 non_repeaters: i32,
640 max_repetitions: i32,
641 ) -> Result<Vec<VarBind>> {
642 let request_id = self.next_request_id();
643 let pdu = GetBulkPdu::new(request_id, non_repeaters, max_repetitions, oids);
644 let response = self.send_bulk_request(pdu).await?;
645 Ok(response.varbinds)
646 }
647
648 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
674 pub fn walk(&self, oid: Oid) -> Result<WalkStream<T>>
675 where
676 T: 'static,
677 {
678 let ordering = self.inner.config.oid_ordering;
679 let max_results = self.inner.config.max_walk_results;
680 let walk_mode = self.inner.config.walk_mode;
681 let max_repetitions = self.inner.config.max_repetitions as i32;
682 let version = self.inner.config.version;
683
684 WalkStream::new(
685 self.clone(),
686 oid,
687 version,
688 walk_mode,
689 ordering,
690 max_results,
691 max_repetitions,
692 )
693 }
694
695 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
718 pub fn walk_getnext(&self, oid: Oid) -> Walk<T>
719 where
720 T: 'static,
721 {
722 let ordering = self.inner.config.oid_ordering;
723 let max_results = self.inner.config.max_walk_results;
724 Walk::new(self.clone(), oid, ordering, max_results)
725 }
726
727 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid, snmp.max_repetitions = max_repetitions))]
753 pub fn bulk_walk(&self, oid: Oid, max_repetitions: i32) -> BulkWalk<T>
754 where
755 T: 'static,
756 {
757 let ordering = self.inner.config.oid_ordering;
758 let max_results = self.inner.config.max_walk_results;
759 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
760 }
761
762 #[instrument(skip(self), fields(snmp.target = %self.peer_addr(), snmp.oid = %oid))]
780 pub fn bulk_walk_default(&self, oid: Oid) -> BulkWalk<T>
781 where
782 T: 'static,
783 {
784 let ordering = self.inner.config.oid_ordering;
785 let max_results = self.inner.config.max_walk_results;
786 let max_repetitions = self.inner.config.max_repetitions as i32;
787 BulkWalk::new(self.clone(), oid, max_repetitions, ordering, max_results)
788 }
789}