1use core::{
16 fmt::{self, Debug, Display},
17 result,
18};
19use std::{
20 io,
21 marker::PhantomData,
22 num::{NonZero, NonZeroU32},
23 ops::AddAssign,
24 pin::Pin,
25 sync::{Arc, LazyLock, Mutex, PoisonError},
26 time::{Duration, SystemTime},
27};
28
29use bytes::Bytes;
30use governor::{DefaultDirectRateLimiter, InsufficientCapacity, Jitter, Quota, RateLimiter};
31use human_units::{
32 FormatDuration,
33 iec::{Byte, Prefix},
34};
35use nonzero_ext::nonzero;
36use opentelemetry::{
37 InstrumentationScope, KeyValue, global,
38 metrics::{Counter, Meter},
39};
40use opentelemetry_otlp::ExporterBuildError;
41use opentelemetry_sdk::{
42 error::{OTelSdkError, OTelSdkResult},
43 metrics::{
44 SdkMeterProvider, Temporality,
45 data::{AggregatedMetrics, Histogram, Metric, MetricData, ResourceMetrics},
46 exporter::PushMetricExporter,
47 },
48};
49use opentelemetry_semantic_conventions::SCHEMA_URL;
50use tansu_client::{Client, ConnectionManager};
51use tansu_sans_io::{
52 ErrorCode, ProduceRequest,
53 primitive::ByteSize as _,
54 produce_request::{PartitionProduceData, TopicProduceData},
55 record::{Record, deflated, inflated},
56};
57use tokio::{
58 signal::unix::{SignalKind, signal},
59 task::JoinSet,
60 time::sleep,
61};
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, instrument};
64use url::Url;
65
66pub type Result<T, E = Error> = result::Result<T, E>;
67
68pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
69 global::meter_with_scope(
70 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
71 .with_version(env!("CARGO_PKG_VERSION"))
72 .with_schema_url(SCHEMA_URL)
73 .build(),
74 )
75});
76
77#[derive(thiserror::Error, Debug)]
78pub enum Error {
79 Api(ErrorCode),
80 Client(#[from] tansu_client::Error),
81 ExporterBuild(#[from] ExporterBuildError),
82 InsufficientCapacity(#[from] InsufficientCapacity),
83 Io(Arc<io::Error>),
84 OtelSdk(#[from] OTelSdkError),
85 Random(#[from] getrandom::Error),
86 Poison,
87 Protocol(#[from] tansu_sans_io::Error),
88 UnknownHost(String),
89 Url(#[from] url::ParseError),
90}
91
92impl<T> From<PoisonError<T>> for Error {
93 fn from(_value: PoisonError<T>) -> Self {
94 Self::Poison
95 }
96}
97
98impl From<io::Error> for Error {
99 fn from(value: io::Error) -> Self {
100 Self::Io(Arc::new(value))
101 }
102}
103
104impl Display for Error {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 write!(f, "{self:?}")
107 }
108}
109
110#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
111pub enum CancelKind {
112 Interrupt,
113 Terminate,
114 Timeout,
115}
116
117#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
118pub struct Perf {
119 broker: Url,
120 topic: String,
121 partition: i32,
122 batch_size: u32,
123 record_size: usize,
124 per_second: Option<u32>,
125 throughput: Option<u32>,
126 producers: u32,
127 duration: Option<Duration>,
128}
129
130#[derive(Clone, Debug)]
131pub struct Builder<B, T> {
132 broker: B,
133 topic: T,
134 partition: i32,
135 batch_size: u32,
136 record_size: usize,
137 per_second: Option<u32>,
138 throughput: Option<u32>,
139 producers: u32,
140 duration: Option<Duration>,
141}
142
143impl Default for Builder<PhantomData<Url>, PhantomData<String>> {
144 fn default() -> Self {
145 Self {
146 broker: Default::default(),
147 topic: Default::default(),
148 partition: Default::default(),
149 batch_size: 1,
150 record_size: 1024,
151 per_second: None,
152 throughput: None,
153 producers: 1,
154 duration: None,
155 }
156 }
157}
158
159impl<B, T> Builder<B, T> {
160 pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T> {
161 Builder {
162 broker: broker.into(),
163 topic: self.topic,
164 partition: self.partition,
165 batch_size: self.batch_size,
166 record_size: self.record_size,
167 per_second: self.per_second,
168 throughput: self.throughput,
169 producers: self.producers,
170 duration: self.duration,
171 }
172 }
173
174 pub fn topic(self, topic: impl Into<String>) -> Builder<B, String> {
175 Builder {
176 broker: self.broker,
177 topic: topic.into(),
178 partition: self.partition,
179 batch_size: self.batch_size,
180 record_size: self.record_size,
181 per_second: self.per_second,
182 throughput: self.throughput,
183 producers: self.producers,
184 duration: self.duration,
185 }
186 }
187
188 pub fn partition(self, partition: i32) -> Builder<B, T> {
189 Self { partition, ..self }
190 }
191
192 pub fn batch_size(self, batch_size: u32) -> Self {
193 Self { batch_size, ..self }
194 }
195
196 pub fn record_size(self, record_size: usize) -> Self {
197 Self {
198 record_size,
199 ..self
200 }
201 }
202
203 pub fn per_second(self, per_second: Option<u32>) -> Self {
204 Self { per_second, ..self }
205 }
206
207 pub fn throughput(self, throughput: Option<u32>) -> Self {
208 Self { throughput, ..self }
209 }
210
211 pub fn producers(self, producers: u32) -> Self {
212 Self { producers, ..self }
213 }
214
215 pub fn duration(self, duration: Option<Duration>) -> Self {
216 Self { duration, ..self }
217 }
218}
219
220impl Builder<Url, String> {
221 pub fn build(self) -> Perf {
222 Perf {
223 broker: self.broker,
224 topic: self.topic,
225 partition: self.partition,
226 batch_size: self.batch_size,
227 record_size: self.record_size,
228 per_second: self.per_second,
229 throughput: self.throughput,
230 producers: self.producers,
231 duration: self.duration,
232 }
233 }
234}
235
236static RATE_LIMIT_DURATION: LazyLock<opentelemetry::metrics::Histogram<u64>> =
237 LazyLock::new(|| {
238 METER
239 .u64_histogram("rate_limit_duration")
240 .with_unit("ms")
241 .with_description("Rate limit latencies in milliseconds")
242 .build()
243 });
244
245static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
246 METER
247 .u64_counter("produce_record_count")
248 .with_description("Produced record count")
249 .build()
250});
251
252static PRODUCE_API_DURATION: LazyLock<opentelemetry::metrics::Histogram<u64>> =
253 LazyLock::new(|| {
254 METER
255 .u64_histogram("produce_duration")
256 .with_unit("ms")
257 .with_description("Produce API latencies in milliseconds")
258 .build()
259 });
260
261impl Perf {
262 pub fn builder() -> Builder<PhantomData<Url>, PhantomData<String>> {
263 Builder::default()
264 }
265
266 pub async fn main(self) -> Result<ErrorCode> {
267 let token = CancellationToken::new();
268
269 let meter_provider = {
270 let exporter = MetricExporter::new(token.clone());
271 let meter_provider = SdkMeterProvider::builder()
272 .with_periodic_exporter(exporter)
273 .build();
274 global::set_meter_provider(meter_provider.clone());
275
276 meter_provider
277 };
278
279 let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
280 debug!(?interrupt_signal);
281
282 let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
283 debug!(?terminate_signal);
284
285 let rate_limiter = self
286 .per_second
287 .or(self.throughput)
288 .inspect(|limit| debug!(?limit))
289 .and_then(NonZeroU32::new)
290 .map(Quota::per_second)
291 .map(RateLimiter::direct)
292 .map(Arc::new)
293 .inspect(|rate_limiter| debug!(?rate_limiter));
294
295 let record_data = {
296 let mut data = vec![0u8; self.record_size];
297 getrandom::fill(&mut data)?;
298 Bytes::from(data)
299 };
300
301 let batch_size = NonZeroU32::new(self.batch_size)
302 .inspect(|batch_size| debug!(batch_size = batch_size.get()))
303 .unwrap_or(nonzero!(10u32));
304
305 let mut set = JoinSet::new();
306
307 let client = ConnectionManager::builder(self.broker)
308 .client_id(Some(env!("CARGO_PKG_NAME").into()))
309 .build()
310 .await
311 .inspect(|pool| debug!(?pool))
312 .map(Client::new)?;
313
314 for id in 0..self.producers {
315 let producer = Producer {
316 id,
317 rate_limiter: rate_limiter.clone(),
318 topic: self.topic.clone(),
319 partition: self.partition,
320 record_data: record_data.clone(),
321 token: token.clone(),
322 client: client.clone(),
323 batch_size,
324 throughput: self.throughput,
325 };
326
327 _ = set.spawn(async move {
328 loop {
329 match producer.rate_limited().await {
330 Ok(false) | Err(_) => break,
331 _ => continue,
332 }
333 }
334 });
335 }
336
337 let join_all = async {
338 while !set.is_empty() {
339 debug!(len = set.len());
340 _ = set.join_next().await;
341 }
342 };
343
344 let duration = self
345 .duration
346 .map(sleep)
347 .map(Box::pin)
348 .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
349 .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
350
351 let cancellation = tokio::select! {
352
353 timeout = duration => {
354 debug!(?timeout);
355 token.cancel();
356 Some(CancelKind::Timeout)
357 }
358
359 completed = join_all => {
360 debug!(?completed);
361 None
362 }
363
364 interrupt = interrupt_signal.recv() => {
365 debug!(?interrupt);
366 Some(CancelKind::Interrupt)
367 }
368
369 terminate = terminate_signal.recv() => {
370 debug!(?terminate);
371 Some(CancelKind::Terminate)
372 }
373
374 };
375
376 debug!(?cancellation);
377
378 meter_provider
379 .shutdown()
380 .inspect(|shutdown| debug!(?shutdown))?;
381
382 if let Some(CancelKind::Timeout) = cancellation {
383 sleep(Duration::from_secs(5)).await;
384 }
385
386 debug!(abort = set.len());
387 set.abort_all();
388
389 while !set.is_empty() {
390 _ = set.join_next().await;
391 }
392
393 Ok(ErrorCode::None)
394 }
395}
396
397#[derive(Clone, Debug)]
398struct Producer {
399 id: u32,
400 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
401 topic: String,
402 partition: i32,
403 record_data: Bytes,
404 token: CancellationToken,
405 client: Client,
406 batch_size: NonZero<u32>,
407 throughput: Option<u32>,
408}
409
410impl Producer {
411 #[instrument(skip_all, fields(record_data_len = self.record_data.len()))]
412 fn frame(&self) -> Result<deflated::Frame> {
413 let mut batch = inflated::Batch::builder();
414 let offset_deltas = 0..(self.batch_size.get() as i32);
415
416 for offset_delta in offset_deltas {
417 batch = batch.record(
418 Record::builder()
419 .value(Some(self.record_data.clone()))
420 .offset_delta(offset_delta),
421 )
422 }
423
424 batch
425 .last_offset_delta(self.batch_size.get() as i32)
426 .build()
427 .map(|batch| inflated::Frame {
428 batches: vec![batch],
429 })
430 .and_then(deflated::Frame::try_from)
431 .map_err(Into::into)
432 }
433
434 #[instrument(skip_all)]
435 async fn produce(&self, frame: deflated::Frame) -> Result<()> {
436 let req = ProduceRequest::default().topic_data(Some(
437 [TopicProduceData::default()
438 .name(self.topic.clone())
439 .partition_data(Some(
440 [PartitionProduceData::default()
441 .index(self.partition)
442 .records(Some(frame))]
443 .into(),
444 ))]
445 .into(),
446 ));
447
448 let response = self.client.call(req).await?;
449
450 assert!(
451 response
452 .responses
453 .unwrap_or_default()
454 .into_iter()
455 .all(|topic| {
456 topic
457 .partition_responses
458 .unwrap_or_default()
459 .iter()
460 .all(|partition| partition.error_code == i16::from(ErrorCode::None))
461 })
462 );
463
464 Ok(())
465 }
466
467 #[instrument(skip_all, fields(id = self.id))]
468 async fn rate_limited(&self) -> Result<bool> {
469 let attributes = [KeyValue::new("producer", self.id.to_string())];
470
471 let frame = self.frame()?;
472
473 if let Some(ref rate_limiter) = self.rate_limiter {
474 let rate_limit_start = SystemTime::now();
475
476 let cells = self
477 .throughput
478 .and(
479 frame
480 .size_in_bytes()
481 .ok()
482 .and_then(|bytes| NonZeroU32::new(bytes as u32)),
483 )
484 .unwrap_or(self.batch_size);
485
486 tokio::select! {
487 cancelled = self.token.cancelled() => {
488 debug!(?cancelled);
489 return Ok(false)
490 },
491
492 Ok(_) = rate_limiter.until_n_ready_with_jitter(cells, Jitter::up_to(Duration::from_millis(10))) => {
493 RATE_LIMIT_DURATION.record(
494 rate_limit_start
495 .elapsed()
496 .inspect(|duration|debug!(rate_limit_duration_ms = duration.as_millis()))
497 .map_or(0, |duration| duration.as_millis() as u64),
498 &attributes)
499
500 },
501 }
502 }
503
504 let produce_start = SystemTime::now();
505
506 tokio::select! {
507 cancelled = self.token.cancelled() => {
508 debug!(?cancelled);
509 return Ok(false)
510 },
511
512 Ok(_) = self.produce(frame) => {
513 PRODUCE_RECORD_COUNT.add(self.batch_size.get() as u64, &attributes);
514 PRODUCE_API_DURATION.record(produce_start.elapsed().inspect(|duration|debug!(produce_duration_ms = duration.as_millis())).map_or(0, |duration| duration.as_millis() as u64), &attributes);
515 },
516 }
517
518 Ok(!self.token.is_cancelled())
519 }
520}
521
522#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
523struct Observation {
524 taken_at: SystemTime,
525 bytes_sent: u64,
526 record_count: u64,
527}
528
529impl AddAssign for Observation {
530 fn add_assign(&mut self, rhs: Self) {
531 self.taken_at = self.taken_at.max(rhs.taken_at);
532 self.bytes_sent += rhs.bytes_sent;
533 self.record_count += rhs.record_count;
534 }
535}
536
537impl Default for Observation {
538 fn default() -> Self {
539 Self {
540 taken_at: SystemTime::now(),
541 bytes_sent: Default::default(),
542 record_count: Default::default(),
543 }
544 }
545}
546
547#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
548struct Info {
549 started_at: SystemTime,
550 previous: Option<ObservationLatency>,
551 current: ObservationLatency,
552}
553
554impl Info {
555 fn new(started_at: SystemTime) -> Self {
556 Self {
557 started_at,
558 current: Default::default(),
559 previous: Default::default(),
560 }
561 }
562
563 fn with_previous(self, previous: Option<ObservationLatency>) -> Self {
564 Self { previous, ..self }
565 }
566
567 fn elapsed(&self) -> Duration {
568 self.current
569 .observation
570 .taken_at
571 .duration_since(
572 self.previous
573 .map_or(self.started_at, |previous| previous.observation.taken_at),
574 )
575 .expect("duration")
576 }
577
578 fn bytes_sent(&self) -> u64 {
579 self.current.observation.bytes_sent
580 - self
581 .previous
582 .map(|previous| previous.observation.bytes_sent)
583 .unwrap_or_default()
584 }
585
586 fn records_sent(&self) -> u64 {
587 self.current.observation.record_count
588 - self
589 .previous
590 .map(|previous| previous.observation.record_count)
591 .unwrap_or_default()
592 }
593
594 fn records_sent_per_second(&self) -> f64 {
595 self.records_sent() as f64 / self.elapsed().as_secs() as f64
596 }
597
598 fn bandwidth(&self) -> Byte {
599 self.bytes_sent()
600 .checked_div(self.elapsed().as_secs())
601 .map(|throughput| Byte::with_iec_prefix(throughput, Prefix::None))
602 .expect("throughput")
603 }
604}
605
606impl Display for Info {
607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608 write!(
609 f,
610 "elapsed: {}, {} records sent, {:.1} records/s, ({}/s), latency: {} min, {:.1}ms avg, {} max",
611 self.elapsed().format_duration(),
612 self.records_sent(),
613 self.records_sent_per_second(),
614 self.bandwidth().format_iec(),
615 self.current
616 .latency
617 .min
618 .map(|min| min.format_duration())
619 .expect("minimum"),
620 self.current.latency.mean.expect("mean"),
621 self.current
622 .latency
623 .max
624 .map(|max| max.format_duration())
625 .expect("max")
626 )
627 }
628}
629
630#[derive(Clone, Copy, Debug, Default, PartialEq, PartialOrd)]
631struct Latency {
632 min: Option<Duration>,
633 max: Option<Duration>,
634 mean: Option<f64>,
635}
636
637impl From<&Histogram<u64>> for Latency {
638 fn from(histogram: &Histogram<u64>) -> Self {
639 let min = histogram
640 .data_points()
641 .filter_map(|dp| dp.min())
642 .min()
643 .map(Duration::from_millis);
644
645 let max = histogram
646 .data_points()
647 .filter_map(|dp| dp.max())
648 .max()
649 .map(Duration::from_millis);
650
651 let sum = histogram.data_points().map(|dp| dp.sum()).sum::<u64>() as f64;
652 let count = histogram.data_points().map(|dp| dp.count()).sum::<u64>() as f64;
653
654 let mean = Some(sum / count);
655
656 Self { min, max, mean }
657 }
658}
659
660#[derive(Clone, Copy, Debug, Default, PartialEq, PartialOrd)]
661struct ObservationLatency {
662 observation: Observation,
663 latency: Latency,
664}
665
666#[derive(Debug)]
667struct MetricExporter {
668 started_at: SystemTime,
669 temporality: Temporality,
670 previous: Mutex<Option<ObservationLatency>>,
671 cancellation: CancellationToken,
672}
673
674impl MetricExporter {
675 fn new(cancellation: CancellationToken) -> Self {
676 let started_at = SystemTime::now();
677 Self {
678 started_at,
679 temporality: Default::default(),
680 previous: Default::default(),
681 cancellation,
682 }
683 }
684
685 #[instrument(skip_all, fields(scope = scope.name(), metric = metric.name()))]
686 fn info(&self, scope: &InstrumentationScope, metric: &Metric, info: &mut Info) {
687 match (scope.name(), metric.name(), metric.data()) {
688 ("tansu-client", "tcp_bytes_sent", AggregatedMetrics::U64(MetricData::Sum(sum))) => {
689 for (point, data) in sum.data_points().enumerate() {
690 debug!(point, value = ?data.value());
691 }
692
693 info.current.observation.bytes_sent =
694 sum.data_points().map(|sum| sum.value()).sum::<u64>();
695 }
696
697 (
698 "tansu-perf",
699 "produce_record_count",
700 AggregatedMetrics::U64(MetricData::Sum(sum)),
701 ) => {
702 for (point, data) in sum.data_points().enumerate() {
703 debug!(point, value = ?data.value());
704 }
705
706 info.current.observation.record_count =
707 sum.data_points().map(|sum| sum.value()).sum::<u64>();
708 }
709
710 (
711 "tansu-perf",
712 "produce_duration",
713 AggregatedMetrics::U64(MetricData::Histogram(histogram)),
714 ) => {
715 info.current.latency = Latency::from(histogram);
716 }
717
718 _ => (),
719 }
720 }
721}
722
723impl PushMetricExporter for MetricExporter {
724 async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
725 let cancelled = self.cancellation.is_cancelled();
726
727 if cancelled {
728 if let Some(previous) = *self.previous.lock().expect("previous") {
729 let mut info = Info::new(self.started_at);
730 info.current = previous;
731
732 println!("{}", info);
733 }
734 } else {
735 let mut previous = self.previous.lock().expect("previous");
736
737 let mut info = Info::new(self.started_at).with_previous(previous.take());
738
739 for scope in metrics.scope_metrics() {
740 debug!(scope = scope.scope().name());
741
742 for metric in scope.metrics() {
743 debug!(scope = scope.scope().name(), metric = metric.name());
744
745 self.info(scope.scope(), metric, &mut info);
746 }
747 }
748
749 println!("{info}");
750
751 _ = previous.replace(info.current);
752 }
753
754 Ok(())
755 }
756
757 fn force_flush(&self) -> OTelSdkResult {
758 Ok(())
759 }
760
761 #[instrument]
762 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
763 Ok(())
764 }
765
766 fn temporality(&self) -> Temporality {
767 self.temporality
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774
775 #[test]
776 fn add_assign_observation() {
777 let now = SystemTime::now();
778 let delta = Duration::from_secs(5);
779
780 let previous = Observation {
781 taken_at: now.checked_sub(delta).expect("previous"),
782 bytes_sent: 32_123,
783 record_count: 12_321,
784 };
785
786 let mut current = Observation {
787 taken_at: now,
788 bytes_sent: 43_234,
789 record_count: 54_345,
790 };
791
792 current += previous;
793
794 assert_eq!(now, current.taken_at);
795 assert_eq!(75_357, current.bytes_sent);
796 assert_eq!(66_666, current.record_count);
797 }
798
799 #[test]
800 fn middle_observation() {
801 let now = SystemTime::now();
802 let elapsed = Duration::from_secs(4);
803
804 let previous = {
805 let observation = Observation {
806 taken_at: now.checked_sub(elapsed).expect("previous"),
807 bytes_sent: 43_234,
808 record_count: 212,
809 };
810
811 ObservationLatency {
812 observation,
813 latency: Default::default(),
814 }
815 };
816
817 let mut info = Info::new(now).with_previous(Some(previous));
818
819 info.current = {
820 let observation = Observation {
821 taken_at: now,
822 bytes_sent: 65_456,
823 record_count: 656,
824 };
825
826 ObservationLatency {
827 observation,
828 latency: Default::default(),
829 }
830 };
831
832 assert_eq!(elapsed, info.elapsed());
833 assert_eq!(5_555, info.bandwidth().0);
834 assert_eq!(111f64, info.records_sent_per_second());
835 }
836
837 #[test]
838 fn last_or_first_observation() {
839 let now = SystemTime::now();
840 let elapsed = Duration::from_secs(4);
841
842 let mut info = Info::new(now.checked_sub(elapsed).expect("elapsed"));
843
844 info.current = {
845 let observation = Observation {
846 taken_at: now,
847 bytes_sent: 65_456,
848 record_count: 656,
849 };
850
851 ObservationLatency {
852 observation,
853 latency: Default::default(),
854 }
855 };
856
857 assert_eq!(elapsed, info.elapsed());
858 assert_eq!(16_364, info.bandwidth().0);
859 assert_eq!(164f64, info.records_sent_per_second());
860 }
861}