tansu_generator/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu Message Generator
16//!
17//! Generate fake data for schema backed topics.
18
19use std::{
20    fmt, io,
21    marker::PhantomData,
22    num::NonZeroU32,
23    pin::Pin,
24    result,
25    sync::{Arc, LazyLock, PoisonError},
26    time::{Duration, SystemTime},
27};
28
29use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
30use nonzero_ext::nonzero;
31use opentelemetry::{
32    InstrumentationScope, KeyValue, global,
33    metrics::{Counter, Histogram, Meter},
34};
35use opentelemetry_otlp::ExporterBuildError;
36use opentelemetry_sdk::error::OTelSdkError;
37use opentelemetry_semantic_conventions::SCHEMA_URL;
38use tansu_client::{Client, ConnectionManager};
39use tansu_otel::meter_provider;
40use tansu_sans_io::{
41    ErrorCode, ProduceRequest,
42    primitive::ByteSize,
43    produce_request::{PartitionProduceData, TopicProduceData},
44    record::{deflated, inflated},
45};
46use tansu_schema::{Generator as _, Registry, Schema};
47use tokio::{
48    signal::unix::{SignalKind, signal},
49    task::JoinSet,
50    time::sleep,
51};
52use tokio_util::sync::CancellationToken;
53use tracing::{Instrument, Level, debug, span};
54use url::Url;
55
56pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
57    global::meter_with_scope(
58        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
59            .with_version(env!("CARGO_PKG_VERSION"))
60            .with_schema_url(SCHEMA_URL)
61            .build(),
62    )
63});
64
65pub type Result<T, E = Error> = result::Result<T, E>;
66
67#[derive(thiserror::Error, Debug)]
68pub enum Error {
69    Api(ErrorCode),
70    Client(#[from] tansu_client::Error),
71    ExporterBuild(#[from] ExporterBuildError),
72    InsufficientCapacity(#[from] InsufficientCapacity),
73    Io(Arc<io::Error>),
74    Otel(#[from] tansu_otel::Error),
75    OtelSdk(#[from] OTelSdkError),
76    Poison,
77    Protocol(#[from] tansu_sans_io::Error),
78    Schema(Box<tansu_schema::Error>),
79    SchemaNotFoundForTopic(String),
80    UnknownHost(String),
81    Url(#[from] url::ParseError),
82}
83
84impl<T> From<PoisonError<T>> for Error {
85    fn from(_value: PoisonError<T>) -> Self {
86        Self::Poison
87    }
88}
89
90impl From<tansu_schema::Error> for Error {
91    fn from(error: tansu_schema::Error) -> Self {
92        Self::Schema(Box::new(error))
93    }
94}
95
96impl From<io::Error> for Error {
97    fn from(value: io::Error) -> Self {
98        Self::Io(Arc::new(value))
99    }
100}
101
102impl fmt::Display for Error {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        write!(f, "{self:?}")
105    }
106}
107
108#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
109pub enum CancelKind {
110    Interrupt,
111    Terminate,
112    Timeout,
113}
114
115#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
116pub struct Configuration {
117    broker: Url,
118    topic: String,
119    partition: i32,
120    schema_registry: Url,
121    batch_size: u32,
122    per_second: Option<u32>,
123    throughput: Option<u32>,
124    producers: u32,
125    duration: Option<Duration>,
126    otlp_endpoint_url: Option<Url>,
127}
128
129#[derive(Clone, Debug)]
130pub struct Generate {
131    configuration: Configuration,
132    registry: Registry,
133}
134
135impl TryFrom<Configuration> for Generate {
136    type Error = Error;
137
138    fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
139        Registry::builder_try_from_url(&configuration.schema_registry)
140            .map(|builder| builder.build())
141            .map(|registry| Self {
142                configuration,
143                registry,
144            })
145            .map_err(Into::into)
146    }
147}
148
149static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
150    METER
151        .u64_histogram("generate_produce_batch_duration")
152        .with_unit("ms")
153        .with_description("Generate a produce batch in milliseconds")
154        .build()
155});
156
157static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
158    METER
159        .u64_histogram("produce_request_response_duration")
160        .with_unit("ms")
161        .with_description("Latency of receiving an produce response in milliseconds")
162        .build()
163});
164
165fn frame(name: String, index: i32, schema: Schema, batch_size: i32) -> Result<deflated::Frame> {
166    let attributes = [
167        KeyValue::new("topic", name.clone()),
168        KeyValue::new("partition", index.to_string()),
169        KeyValue::new("batch_size", batch_size.to_string()),
170    ];
171
172    let start = SystemTime::now();
173
174    let mut batch = inflated::Batch::builder();
175    let offset_deltas = 0..batch_size;
176
177    for offset_delta in offset_deltas {
178        batch = schema
179            .generate()
180            .map(|record| record.offset_delta(offset_delta))
181            .map(|record| batch.record(record))?;
182    }
183
184    batch
185        .last_offset_delta(batch_size)
186        .build()
187        .map(|batch| inflated::Frame {
188            batches: vec![batch],
189        })
190        .and_then(deflated::Frame::try_from)
191        .inspect(|_| {
192            GENERATE_PRODUCE_BATCH_DURATION.record(
193                start
194                    .elapsed()
195                    .map_or(0, |duration| duration.as_millis() as u64),
196                &attributes,
197            )
198        })
199        .map_err(Into::into)
200}
201
202pub async fn produce(
203    client: Client,
204    name: String,
205    index: i32,
206    batch_size: i32,
207    frame: deflated::Frame,
208) -> Result<()> {
209    debug!(?client, %name, index, batch_size);
210
211    let attributes = [
212        KeyValue::new("topic", name.clone()),
213        KeyValue::new("partition", index.to_string()),
214        KeyValue::new("batch_size", batch_size.to_string()),
215    ];
216
217    let req = ProduceRequest::default().topic_data(Some(
218        [TopicProduceData::default().name(name).partition_data(Some(
219            [PartitionProduceData::default()
220                .index(index)
221                .records(Some(frame))]
222            .into(),
223        ))]
224        .into(),
225    ));
226
227    let start = SystemTime::now();
228
229    let response = client.call(req).await.inspect(|_| {
230        PRODUCE_REQUEST_RESPONSE_DURATION.record(
231            start
232                .elapsed()
233                .map_or(0, |duration| duration.as_millis() as u64),
234            &attributes,
235        )
236    })?;
237
238    assert!(
239        response
240            .responses
241            .unwrap_or_default()
242            .into_iter()
243            .all(|topic| {
244                topic
245                    .partition_responses
246                    .unwrap_or_default()
247                    .iter()
248                    .inspect(|partition| debug!(topic = %topic.name, ?partition))
249                    .all(|partition| partition.error_code == i16::from(ErrorCode::None))
250            })
251    );
252
253    Ok(())
254}
255
256static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
257    METER
258        .u64_histogram("rate_limit_duration")
259        .with_unit("ms")
260        .with_description("Rate limit latencies in milliseconds")
261        .build()
262});
263
264static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
265    METER
266        .u64_counter("produce_record_count")
267        .with_description("Produced record count")
268        .build()
269});
270
271static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
272    METER
273        .u64_histogram("produce_duration")
274        .with_unit("ms")
275        .with_description("Produce API latencies in milliseconds")
276        .build()
277});
278
279impl Generate {
280    pub async fn main(self) -> Result<ErrorCode> {
281        debug!(configuration = ?self.configuration);
282
283        let meter_provider = self
284            .configuration
285            .otlp_endpoint_url
286            .map_or(Ok(None), |otlp_endpoint_url| {
287                meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
288            })?;
289
290        let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
291            return Err(Error::SchemaNotFoundForTopic(
292                self.configuration.topic.clone(),
293            ));
294        };
295
296        let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
297        debug!(?interrupt_signal);
298
299        let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
300        debug!(?terminate_signal);
301
302        let rate_limiter = self
303            .configuration
304            .per_second
305            .or(self.configuration.throughput)
306            .and_then(NonZeroU32::new)
307            .map(Quota::per_second)
308            .map(RateLimiter::direct)
309            .map(Arc::new)
310            .inspect(|rate_limiter| debug!(?rate_limiter));
311
312        let batch_size = NonZeroU32::new(self.configuration.batch_size)
313            .inspect(|batch_size| debug!(batch_size = batch_size.get()))
314            .unwrap_or(nonzero!(10u32));
315
316        let mut set = JoinSet::new();
317
318        let token = CancellationToken::new();
319
320        let client = ConnectionManager::builder(self.configuration.broker)
321            .client_id(Some(env!("CARGO_PKG_NAME").into()))
322            .build()
323            .await
324            .inspect(|pool| debug!(?pool))
325            .map(Client::new)?;
326
327        for producer in 0..self.configuration.producers {
328            let rate_limiter = rate_limiter.clone();
329            let schema = schema.clone();
330            let topic = self.configuration.topic.clone();
331            let partition = self.configuration.partition;
332            let token = token.clone();
333            let client = client.clone();
334
335            _ = set.spawn(async move {
336                    let span = span!(Level::DEBUG, "producer", producer);
337
338                    async move {
339                        let attributes = [KeyValue::new("producer", producer.to_string())];
340
341                        loop {
342                            debug!(%topic, partition);
343
344                           let Ok(frame) = frame(topic.clone(), partition, schema.clone(), batch_size.get() as i32) else {
345                               break
346                           };
347
348                           if let Some(ref rate_limiter) = rate_limiter {
349                                let rate_limit_start = SystemTime::now();
350
351
352                                let cells = self.configuration.throughput.and(frame.size_in_bytes().ok().and_then(|bytes|NonZeroU32::new(bytes as u32))).unwrap_or(batch_size);
353                                debug!(cells);
354
355
356                                tokio::select! {
357                                    cancelled = token.cancelled() => {
358                                        debug!(?cancelled);
359                                        break
360                                    },
361
362                                    Ok(_) = rate_limiter.until_n_ready_with_jitter(cells, Jitter::up_to(Duration::from_millis(50))) => {
363                                        RATE_LIMIT_DURATION.record(
364                                        rate_limit_start
365                                            .elapsed()
366                                            .inspect(|duration|debug!(rate_limit_duration_ms = duration.as_millis()))
367                                            .map_or(0, |duration| duration.as_millis() as u64),
368                                            &attributes)
369
370                                    },
371                                }
372                            }
373
374                            let produce_start = SystemTime::now();
375
376                            tokio::select! {
377                                cancelled = token.cancelled() => {
378                                    debug!(?cancelled);
379                                    break
380                                },
381
382                                Ok(_) = produce(client.clone(), topic.clone(), partition,  batch_size.get() as i32, frame) => {
383                                    PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
384                                    PRODUCE_API_DURATION.record(produce_start.elapsed().inspect(|duration|debug!(produce_duration_ms = duration.as_millis())).map_or(0, |duration| duration.as_millis() as u64), &attributes);
385                                },
386                            }
387                        }
388
389                    }.instrument(span).await;
390
391
392                });
393        }
394
395        let join_all = async {
396            while !set.is_empty() {
397                debug!(len = set.len());
398                _ = set.join_next().await;
399            }
400        };
401
402        let duration = self
403            .configuration
404            .duration
405            .map(sleep)
406            .map(Box::pin)
407            .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
408            .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
409
410        let cancellation = tokio::select! {
411
412            timeout = duration => {
413                debug!(?timeout);
414                token.cancel();
415                Some(CancelKind::Timeout)
416            }
417
418            completed = join_all => {
419                debug!(?completed);
420                None
421            }
422
423            interrupt = interrupt_signal.recv() => {
424                debug!(?interrupt);
425                Some(CancelKind::Interrupt)
426            }
427
428            terminate = terminate_signal.recv() => {
429                debug!(?terminate);
430                Some(CancelKind::Terminate)
431            }
432
433        };
434
435        debug!(?cancellation);
436
437        if let Some(meter_provider) = meter_provider {
438            meter_provider
439                .force_flush()
440                .inspect(|force_flush| debug!(?force_flush))?;
441
442            meter_provider
443                .shutdown()
444                .inspect(|shutdown| debug!(?shutdown))?;
445        }
446
447        if let Some(CancelKind::Timeout) = cancellation {
448            sleep(Duration::from_secs(5)).await;
449        }
450
451        debug!(abort = set.len());
452        set.abort_all();
453
454        while !set.is_empty() {
455            _ = set.join_next().await;
456        }
457
458        Ok(ErrorCode::None)
459    }
460
461    pub fn builder()
462    -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
463        Builder::default()
464    }
465}
466
467#[derive(Clone, Debug)]
468pub struct Builder<B, T, P, S> {
469    broker: B,
470    topic: T,
471    partition: P,
472    schema_registry: S,
473    batch_size: u32,
474    per_second: Option<u32>,
475    throughput: Option<u32>,
476    producers: u32,
477    duration: Option<Duration>,
478    otlp_endpoint_url: Option<Url>,
479}
480
481impl Default
482    for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
483{
484    fn default() -> Self {
485        Self {
486            broker: Default::default(),
487            topic: Default::default(),
488            partition: Default::default(),
489            schema_registry: Default::default(),
490            batch_size: 1,
491            per_second: None,
492            throughput: None,
493            producers: 1,
494            duration: None,
495            otlp_endpoint_url: None,
496        }
497    }
498}
499
500impl<B, T, P, S> Builder<B, T, P, S> {
501    pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
502        Builder {
503            broker: broker.into(),
504            topic: self.topic,
505            partition: self.partition,
506            schema_registry: self.schema_registry,
507            batch_size: self.batch_size,
508            per_second: self.per_second,
509            throughput: self.throughput,
510            producers: self.producers,
511            duration: self.duration,
512            otlp_endpoint_url: self.otlp_endpoint_url,
513        }
514    }
515
516    pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
517        Builder {
518            broker: self.broker,
519            topic: topic.into(),
520            partition: self.partition,
521            schema_registry: self.schema_registry,
522            batch_size: self.batch_size,
523            per_second: self.per_second,
524            throughput: self.throughput,
525            producers: self.producers,
526            duration: self.duration,
527            otlp_endpoint_url: self.otlp_endpoint_url,
528        }
529    }
530
531    pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
532        Builder {
533            broker: self.broker,
534            topic: self.topic,
535            partition,
536            schema_registry: self.schema_registry,
537            batch_size: self.batch_size,
538            per_second: self.per_second,
539            throughput: self.throughput,
540            producers: self.producers,
541            duration: self.duration,
542            otlp_endpoint_url: self.otlp_endpoint_url,
543        }
544    }
545
546    pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
547        Builder {
548            broker: self.broker,
549            topic: self.topic,
550            partition: self.partition,
551            schema_registry,
552            batch_size: self.batch_size,
553            per_second: self.per_second,
554            throughput: self.throughput,
555            producers: self.producers,
556            duration: self.duration,
557            otlp_endpoint_url: self.otlp_endpoint_url,
558        }
559    }
560
561    pub fn batch_size(self, batch_size: u32) -> Self {
562        Self { batch_size, ..self }
563    }
564
565    pub fn per_second(self, per_second: Option<u32>) -> Self {
566        Self { per_second, ..self }
567    }
568
569    pub fn throughput(self, throughput: Option<u32>) -> Self {
570        Self { throughput, ..self }
571    }
572
573    pub fn producers(self, producers: u32) -> Self {
574        Self { producers, ..self }
575    }
576
577    pub fn duration(self, duration: Option<Duration>) -> Self {
578        Self { duration, ..self }
579    }
580
581    pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Self {
582        Self {
583            otlp_endpoint_url,
584            ..self
585        }
586    }
587}
588
589impl Builder<Url, String, i32, Url> {
590    pub fn build(self) -> Result<Generate> {
591        Generate::try_from(Configuration {
592            broker: self.broker,
593            topic: self.topic,
594            partition: self.partition,
595            schema_registry: self.schema_registry,
596            batch_size: self.batch_size,
597            per_second: self.per_second,
598            throughput: self.throughput,
599            producers: self.producers,
600            duration: self.duration,
601            otlp_endpoint_url: self.otlp_endpoint_url,
602        })
603    }
604}