tansu_perf/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::{
16    fmt::{self, Debug, Display},
17    result,
18};
19use std::{
20    io,
21    marker::PhantomData,
22    num::{NonZero, NonZeroU32},
23    ops::AddAssign,
24    pin::Pin,
25    sync::{Arc, LazyLock, Mutex, PoisonError},
26    time::{Duration, SystemTime},
27};
28
29use bytes::Bytes;
30use governor::{DefaultDirectRateLimiter, InsufficientCapacity, Jitter, Quota, RateLimiter};
31use human_units::{
32    FormatDuration,
33    iec::{Byte, Prefix},
34};
35use nonzero_ext::nonzero;
36use opentelemetry::{
37    InstrumentationScope, KeyValue, global,
38    metrics::{Counter, Meter},
39};
40use opentelemetry_otlp::ExporterBuildError;
41use opentelemetry_sdk::{
42    error::{OTelSdkError, OTelSdkResult},
43    metrics::{
44        SdkMeterProvider, Temporality,
45        data::{AggregatedMetrics, Histogram, Metric, MetricData, ResourceMetrics},
46        exporter::PushMetricExporter,
47    },
48};
49use opentelemetry_semantic_conventions::SCHEMA_URL;
50use tansu_client::{Client, ConnectionManager};
51use tansu_sans_io::{
52    ErrorCode, ProduceRequest,
53    primitive::ByteSize as _,
54    produce_request::{PartitionProduceData, TopicProduceData},
55    record::{Record, deflated, inflated},
56};
57use tokio::{
58    signal::unix::{SignalKind, signal},
59    task::JoinSet,
60    time::sleep,
61};
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, instrument};
64use url::Url;
65
66pub type Result<T, E = Error> = result::Result<T, E>;
67
68pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
69    global::meter_with_scope(
70        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
71            .with_version(env!("CARGO_PKG_VERSION"))
72            .with_schema_url(SCHEMA_URL)
73            .build(),
74    )
75});
76
77#[derive(thiserror::Error, Debug)]
78pub enum Error {
79    Api(ErrorCode),
80    Client(#[from] tansu_client::Error),
81    ExporterBuild(#[from] ExporterBuildError),
82    InsufficientCapacity(#[from] InsufficientCapacity),
83    Io(Arc<io::Error>),
84    OtelSdk(#[from] OTelSdkError),
85    Random(#[from] getrandom::Error),
86    Poison,
87    Protocol(#[from] tansu_sans_io::Error),
88    UnknownHost(String),
89    Url(#[from] url::ParseError),
90}
91
92impl<T> From<PoisonError<T>> for Error {
93    fn from(_value: PoisonError<T>) -> Self {
94        Self::Poison
95    }
96}
97
98impl From<io::Error> for Error {
99    fn from(value: io::Error) -> Self {
100        Self::Io(Arc::new(value))
101    }
102}
103
104impl Display for Error {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        write!(f, "{self:?}")
107    }
108}
109
110#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
111pub enum CancelKind {
112    Interrupt,
113    Terminate,
114    Timeout,
115}
116
117#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
118pub struct Perf {
119    broker: Url,
120    topic: String,
121    partition: i32,
122    batch_size: u32,
123    record_size: usize,
124    per_second: Option<u32>,
125    throughput: Option<u32>,
126    producers: u32,
127    duration: Option<Duration>,
128}
129
130#[derive(Clone, Debug)]
131pub struct Builder<B, T> {
132    broker: B,
133    topic: T,
134    partition: i32,
135    batch_size: u32,
136    record_size: usize,
137    per_second: Option<u32>,
138    throughput: Option<u32>,
139    producers: u32,
140    duration: Option<Duration>,
141}
142
143impl Default for Builder<PhantomData<Url>, PhantomData<String>> {
144    fn default() -> Self {
145        Self {
146            broker: Default::default(),
147            topic: Default::default(),
148            partition: Default::default(),
149            batch_size: 1,
150            record_size: 1024,
151            per_second: None,
152            throughput: None,
153            producers: 1,
154            duration: None,
155        }
156    }
157}
158
159impl<B, T> Builder<B, T> {
160    pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T> {
161        Builder {
162            broker: broker.into(),
163            topic: self.topic,
164            partition: self.partition,
165            batch_size: self.batch_size,
166            record_size: self.record_size,
167            per_second: self.per_second,
168            throughput: self.throughput,
169            producers: self.producers,
170            duration: self.duration,
171        }
172    }
173
174    pub fn topic(self, topic: impl Into<String>) -> Builder<B, String> {
175        Builder {
176            broker: self.broker,
177            topic: topic.into(),
178            partition: self.partition,
179            batch_size: self.batch_size,
180            record_size: self.record_size,
181            per_second: self.per_second,
182            throughput: self.throughput,
183            producers: self.producers,
184            duration: self.duration,
185        }
186    }
187
188    pub fn partition(self, partition: i32) -> Builder<B, T> {
189        Self { partition, ..self }
190    }
191
192    pub fn batch_size(self, batch_size: u32) -> Self {
193        Self { batch_size, ..self }
194    }
195
196    pub fn record_size(self, record_size: usize) -> Self {
197        Self {
198            record_size,
199            ..self
200        }
201    }
202
203    pub fn per_second(self, per_second: Option<u32>) -> Self {
204        Self { per_second, ..self }
205    }
206
207    pub fn throughput(self, throughput: Option<u32>) -> Self {
208        Self { throughput, ..self }
209    }
210
211    pub fn producers(self, producers: u32) -> Self {
212        Self { producers, ..self }
213    }
214
215    pub fn duration(self, duration: Option<Duration>) -> Self {
216        Self { duration, ..self }
217    }
218}
219
220impl Builder<Url, String> {
221    pub fn build(self) -> Perf {
222        Perf {
223            broker: self.broker,
224            topic: self.topic,
225            partition: self.partition,
226            batch_size: self.batch_size,
227            record_size: self.record_size,
228            per_second: self.per_second,
229            throughput: self.throughput,
230            producers: self.producers,
231            duration: self.duration,
232        }
233    }
234}
235
236static RATE_LIMIT_DURATION: LazyLock<opentelemetry::metrics::Histogram<u64>> =
237    LazyLock::new(|| {
238        METER
239            .u64_histogram("rate_limit_duration")
240            .with_unit("ms")
241            .with_description("Rate limit latencies in milliseconds")
242            .build()
243    });
244
245static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
246    METER
247        .u64_counter("produce_record_count")
248        .with_description("Produced record count")
249        .build()
250});
251
252static PRODUCE_API_DURATION: LazyLock<opentelemetry::metrics::Histogram<u64>> =
253    LazyLock::new(|| {
254        METER
255            .u64_histogram("produce_duration")
256            .with_unit("ms")
257            .with_description("Produce API latencies in milliseconds")
258            .build()
259    });
260
261impl Perf {
262    pub fn builder() -> Builder<PhantomData<Url>, PhantomData<String>> {
263        Builder::default()
264    }
265
266    pub async fn main(self) -> Result<ErrorCode> {
267        let token = CancellationToken::new();
268
269        let meter_provider = {
270            let exporter = MetricExporter::new(token.clone());
271            let meter_provider = SdkMeterProvider::builder()
272                .with_periodic_exporter(exporter)
273                .build();
274            global::set_meter_provider(meter_provider.clone());
275
276            meter_provider
277        };
278
279        let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
280        debug!(?interrupt_signal);
281
282        let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
283        debug!(?terminate_signal);
284
285        let rate_limiter = self
286            .per_second
287            .or(self.throughput)
288            .inspect(|limit| debug!(?limit))
289            .and_then(NonZeroU32::new)
290            .map(Quota::per_second)
291            .map(RateLimiter::direct)
292            .map(Arc::new)
293            .inspect(|rate_limiter| debug!(?rate_limiter));
294
295        let record_data = {
296            let mut data = vec![0u8; self.record_size];
297            getrandom::fill(&mut data)?;
298            Bytes::from(data)
299        };
300
301        let batch_size = NonZeroU32::new(self.batch_size)
302            .inspect(|batch_size| debug!(batch_size = batch_size.get()))
303            .unwrap_or(nonzero!(10u32));
304
305        let mut set = JoinSet::new();
306
307        let client = ConnectionManager::builder(self.broker)
308            .client_id(Some(env!("CARGO_PKG_NAME").into()))
309            .build()
310            .await
311            .inspect(|pool| debug!(?pool))
312            .map(Client::new)?;
313
314        for id in 0..self.producers {
315            let producer = Producer {
316                id,
317                rate_limiter: rate_limiter.clone(),
318                topic: self.topic.clone(),
319                partition: self.partition,
320                record_data: record_data.clone(),
321                token: token.clone(),
322                client: client.clone(),
323                batch_size,
324                throughput: self.throughput,
325            };
326
327            _ = set.spawn(async move {
328                loop {
329                    match producer.rate_limited().await {
330                        Ok(false) | Err(_) => break,
331                        _ => continue,
332                    }
333                }
334            });
335        }
336
337        let join_all = async {
338            while !set.is_empty() {
339                debug!(len = set.len());
340                _ = set.join_next().await;
341            }
342        };
343
344        let duration = self
345            .duration
346            .map(sleep)
347            .map(Box::pin)
348            .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
349            .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
350
351        let cancellation = tokio::select! {
352
353            timeout = duration => {
354                debug!(?timeout);
355                token.cancel();
356                Some(CancelKind::Timeout)
357            }
358
359            completed = join_all => {
360                debug!(?completed);
361                None
362            }
363
364            interrupt = interrupt_signal.recv() => {
365                debug!(?interrupt);
366                Some(CancelKind::Interrupt)
367            }
368
369            terminate = terminate_signal.recv() => {
370                debug!(?terminate);
371                Some(CancelKind::Terminate)
372            }
373
374        };
375
376        debug!(?cancellation);
377
378        meter_provider
379            .shutdown()
380            .inspect(|shutdown| debug!(?shutdown))?;
381
382        if let Some(CancelKind::Timeout) = cancellation {
383            sleep(Duration::from_secs(5)).await;
384        }
385
386        debug!(abort = set.len());
387        set.abort_all();
388
389        while !set.is_empty() {
390            _ = set.join_next().await;
391        }
392
393        Ok(ErrorCode::None)
394    }
395}
396
397#[derive(Clone, Debug)]
398struct Producer {
399    id: u32,
400    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
401    topic: String,
402    partition: i32,
403    record_data: Bytes,
404    token: CancellationToken,
405    client: Client,
406    batch_size: NonZero<u32>,
407    throughput: Option<u32>,
408}
409
410impl Producer {
411    #[instrument(skip_all, fields(record_data_len = self.record_data.len()))]
412    fn frame(&self) -> Result<deflated::Frame> {
413        let mut batch = inflated::Batch::builder();
414        let offset_deltas = 0..(self.batch_size.get() as i32);
415
416        for offset_delta in offset_deltas {
417            batch = batch.record(
418                Record::builder()
419                    .value(Some(self.record_data.clone()))
420                    .offset_delta(offset_delta),
421            )
422        }
423
424        batch
425            .last_offset_delta(self.batch_size.get() as i32)
426            .build()
427            .map(|batch| inflated::Frame {
428                batches: vec![batch],
429            })
430            .and_then(deflated::Frame::try_from)
431            .map_err(Into::into)
432    }
433
434    #[instrument(skip_all)]
435    async fn produce(&self, frame: deflated::Frame) -> Result<()> {
436        let req = ProduceRequest::default().topic_data(Some(
437            [TopicProduceData::default()
438                .name(self.topic.clone())
439                .partition_data(Some(
440                    [PartitionProduceData::default()
441                        .index(self.partition)
442                        .records(Some(frame))]
443                    .into(),
444                ))]
445            .into(),
446        ));
447
448        let response = self.client.call(req).await?;
449
450        assert!(
451            response
452                .responses
453                .unwrap_or_default()
454                .into_iter()
455                .all(|topic| {
456                    topic
457                        .partition_responses
458                        .unwrap_or_default()
459                        .iter()
460                        .all(|partition| partition.error_code == i16::from(ErrorCode::None))
461                })
462        );
463
464        Ok(())
465    }
466
467    #[instrument(skip_all, fields(id = self.id))]
468    async fn rate_limited(&self) -> Result<bool> {
469        let attributes = [KeyValue::new("producer", self.id.to_string())];
470
471        let frame = self.frame()?;
472
473        if let Some(ref rate_limiter) = self.rate_limiter {
474            let rate_limit_start = SystemTime::now();
475
476            let cells = self
477                .throughput
478                .and(
479                    frame
480                        .size_in_bytes()
481                        .ok()
482                        .and_then(|bytes| NonZeroU32::new(bytes as u32)),
483                )
484                .unwrap_or(self.batch_size);
485
486            tokio::select! {
487                cancelled = self.token.cancelled() => {
488                    debug!(?cancelled);
489                    return Ok(false)
490                },
491
492                Ok(_) = rate_limiter.until_n_ready_with_jitter(cells, Jitter::up_to(Duration::from_millis(10))) => {
493                    RATE_LIMIT_DURATION.record(
494                    rate_limit_start
495                        .elapsed()
496                        .inspect(|duration|debug!(rate_limit_duration_ms = duration.as_millis()))
497                        .map_or(0, |duration| duration.as_millis() as u64),
498                        &attributes)
499
500                },
501            }
502        }
503
504        let produce_start = SystemTime::now();
505
506        tokio::select! {
507            cancelled = self.token.cancelled() => {
508                debug!(?cancelled);
509                return Ok(false)
510            },
511
512            Ok(_) = self.produce(frame) => {
513                PRODUCE_RECORD_COUNT.add(self.batch_size.get() as u64, &attributes);
514                PRODUCE_API_DURATION.record(produce_start.elapsed().inspect(|duration|debug!(produce_duration_ms = duration.as_millis())).map_or(0, |duration| duration.as_millis() as u64), &attributes);
515            },
516        }
517
518        Ok(!self.token.is_cancelled())
519    }
520}
521
522#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
523struct Observation {
524    taken_at: SystemTime,
525    bytes_sent: u64,
526    record_count: u64,
527}
528
529impl AddAssign for Observation {
530    fn add_assign(&mut self, rhs: Self) {
531        self.taken_at = self.taken_at.max(rhs.taken_at);
532        self.bytes_sent += rhs.bytes_sent;
533        self.record_count += rhs.record_count;
534    }
535}
536
537impl Default for Observation {
538    fn default() -> Self {
539        Self {
540            taken_at: SystemTime::now(),
541            bytes_sent: Default::default(),
542            record_count: Default::default(),
543        }
544    }
545}
546
547#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
548struct Info {
549    started_at: SystemTime,
550    previous: Option<ObservationLatency>,
551    current: ObservationLatency,
552}
553
554impl Info {
555    fn new(started_at: SystemTime) -> Self {
556        Self {
557            started_at,
558            current: Default::default(),
559            previous: Default::default(),
560        }
561    }
562
563    fn with_previous(self, previous: Option<ObservationLatency>) -> Self {
564        Self { previous, ..self }
565    }
566
567    fn elapsed(&self) -> Duration {
568        self.current
569            .observation
570            .taken_at
571            .duration_since(
572                self.previous
573                    .map_or(self.started_at, |previous| previous.observation.taken_at),
574            )
575            .expect("duration")
576    }
577
578    fn bytes_sent(&self) -> u64 {
579        self.current.observation.bytes_sent
580            - self
581                .previous
582                .map(|previous| previous.observation.bytes_sent)
583                .unwrap_or_default()
584    }
585
586    fn records_sent(&self) -> u64 {
587        self.current.observation.record_count
588            - self
589                .previous
590                .map(|previous| previous.observation.record_count)
591                .unwrap_or_default()
592    }
593
594    fn records_sent_per_second(&self) -> f64 {
595        self.records_sent() as f64 / self.elapsed().as_secs() as f64
596    }
597
598    fn bandwidth(&self) -> Byte {
599        self.bytes_sent()
600            .checked_div(self.elapsed().as_secs())
601            .map(|throughput| Byte::with_iec_prefix(throughput, Prefix::None))
602            .expect("throughput")
603    }
604}
605
606impl Display for Info {
607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608        write!(
609            f,
610            "elapsed: {}, {} records sent, {:.1} records/s, ({}/s), latency: {} min, {:.1}ms avg, {} max",
611            self.elapsed().format_duration(),
612            self.records_sent(),
613            self.records_sent_per_second(),
614            self.bandwidth().format_iec(),
615            self.current
616                .latency
617                .min
618                .map(|min| min.format_duration())
619                .expect("minimum"),
620            self.current.latency.mean.expect("mean"),
621            self.current
622                .latency
623                .max
624                .map(|max| max.format_duration())
625                .expect("max")
626        )
627    }
628}
629
630#[derive(Clone, Copy, Debug, Default, PartialEq, PartialOrd)]
631struct Latency {
632    min: Option<Duration>,
633    max: Option<Duration>,
634    mean: Option<f64>,
635}
636
637impl From<&Histogram<u64>> for Latency {
638    fn from(histogram: &Histogram<u64>) -> Self {
639        let min = histogram
640            .data_points()
641            .filter_map(|dp| dp.min())
642            .min()
643            .map(Duration::from_millis);
644
645        let max = histogram
646            .data_points()
647            .filter_map(|dp| dp.max())
648            .max()
649            .map(Duration::from_millis);
650
651        let sum = histogram.data_points().map(|dp| dp.sum()).sum::<u64>() as f64;
652        let count = histogram.data_points().map(|dp| dp.count()).sum::<u64>() as f64;
653
654        let mean = Some(sum / count);
655
656        Self { min, max, mean }
657    }
658}
659
660#[derive(Clone, Copy, Debug, Default, PartialEq, PartialOrd)]
661struct ObservationLatency {
662    observation: Observation,
663    latency: Latency,
664}
665
666#[derive(Debug)]
667struct MetricExporter {
668    started_at: SystemTime,
669    temporality: Temporality,
670    previous: Mutex<Option<ObservationLatency>>,
671    cancellation: CancellationToken,
672}
673
674impl MetricExporter {
675    fn new(cancellation: CancellationToken) -> Self {
676        let started_at = SystemTime::now();
677        Self {
678            started_at,
679            temporality: Default::default(),
680            previous: Default::default(),
681            cancellation,
682        }
683    }
684
685    #[instrument(skip_all, fields(scope = scope.name(), metric = metric.name()))]
686    fn info(&self, scope: &InstrumentationScope, metric: &Metric, info: &mut Info) {
687        match (scope.name(), metric.name(), metric.data()) {
688            ("tansu-client", "tcp_bytes_sent", AggregatedMetrics::U64(MetricData::Sum(sum))) => {
689                for (point, data) in sum.data_points().enumerate() {
690                    debug!(point, value = ?data.value());
691                }
692
693                info.current.observation.bytes_sent =
694                    sum.data_points().map(|sum| sum.value()).sum::<u64>();
695            }
696
697            (
698                "tansu-perf",
699                "produce_record_count",
700                AggregatedMetrics::U64(MetricData::Sum(sum)),
701            ) => {
702                for (point, data) in sum.data_points().enumerate() {
703                    debug!(point, value = ?data.value());
704                }
705
706                info.current.observation.record_count =
707                    sum.data_points().map(|sum| sum.value()).sum::<u64>();
708            }
709
710            (
711                "tansu-perf",
712                "produce_duration",
713                AggregatedMetrics::U64(MetricData::Histogram(histogram)),
714            ) => {
715                info.current.latency = Latency::from(histogram);
716            }
717
718            _ => (),
719        }
720    }
721}
722
723impl PushMetricExporter for MetricExporter {
724    async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
725        let cancelled = self.cancellation.is_cancelled();
726
727        if cancelled {
728            if let Some(previous) = *self.previous.lock().expect("previous") {
729                let mut info = Info::new(self.started_at);
730                info.current = previous;
731
732                println!("{}", info);
733            }
734        } else {
735            let mut previous = self.previous.lock().expect("previous");
736
737            let mut info = Info::new(self.started_at).with_previous(previous.take());
738
739            for scope in metrics.scope_metrics() {
740                debug!(scope = scope.scope().name());
741
742                for metric in scope.metrics() {
743                    debug!(scope = scope.scope().name(), metric = metric.name());
744
745                    self.info(scope.scope(), metric, &mut info);
746                }
747            }
748
749            println!("{info}");
750
751            _ = previous.replace(info.current);
752        }
753
754        Ok(())
755    }
756
757    fn force_flush(&self) -> OTelSdkResult {
758        Ok(())
759    }
760
761    #[instrument]
762    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
763        Ok(())
764    }
765
766    fn temporality(&self) -> Temporality {
767        self.temporality
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774
775    #[test]
776    fn add_assign_observation() {
777        let now = SystemTime::now();
778        let delta = Duration::from_secs(5);
779
780        let previous = Observation {
781            taken_at: now.checked_sub(delta).expect("previous"),
782            bytes_sent: 32_123,
783            record_count: 12_321,
784        };
785
786        let mut current = Observation {
787            taken_at: now,
788            bytes_sent: 43_234,
789            record_count: 54_345,
790        };
791
792        current += previous;
793
794        assert_eq!(now, current.taken_at);
795        assert_eq!(75_357, current.bytes_sent);
796        assert_eq!(66_666, current.record_count);
797    }
798
799    #[test]
800    fn middle_observation() {
801        let now = SystemTime::now();
802        let elapsed = Duration::from_secs(4);
803
804        let previous = {
805            let observation = Observation {
806                taken_at: now.checked_sub(elapsed).expect("previous"),
807                bytes_sent: 43_234,
808                record_count: 212,
809            };
810
811            ObservationLatency {
812                observation,
813                latency: Default::default(),
814            }
815        };
816
817        let mut info = Info::new(now).with_previous(Some(previous));
818
819        info.current = {
820            let observation = Observation {
821                taken_at: now,
822                bytes_sent: 65_456,
823                record_count: 656,
824            };
825
826            ObservationLatency {
827                observation,
828                latency: Default::default(),
829            }
830        };
831
832        assert_eq!(elapsed, info.elapsed());
833        assert_eq!(5_555, info.bandwidth().0);
834        assert_eq!(111f64, info.records_sent_per_second());
835    }
836
837    #[test]
838    fn last_or_first_observation() {
839        let now = SystemTime::now();
840        let elapsed = Duration::from_secs(4);
841
842        let mut info = Info::new(now.checked_sub(elapsed).expect("elapsed"));
843
844        info.current = {
845            let observation = Observation {
846                taken_at: now,
847                bytes_sent: 65_456,
848                record_count: 656,
849            };
850
851            ObservationLatency {
852                observation,
853                latency: Default::default(),
854            }
855        };
856
857        assert_eq!(elapsed, info.elapsed());
858        assert_eq!(16_364, info.bandwidth().0);
859        assert_eq!(164f64, info.records_sent_per_second());
860    }
861}