tansu_generator/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu Message Generator
16//!
17//! Generate fake data for schema backed topics.
18
19use std::{
20    fmt, io,
21    marker::PhantomData,
22    num::NonZeroU32,
23    pin::Pin,
24    result,
25    sync::{Arc, LazyLock, PoisonError},
26    time::{Duration, SystemTime},
27};
28
29use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
30use nonzero_ext::nonzero;
31use opentelemetry::{
32    InstrumentationScope, KeyValue, global,
33    metrics::{Counter, Histogram, Meter},
34};
35use opentelemetry_otlp::ExporterBuildError;
36use opentelemetry_sdk::error::OTelSdkError;
37use opentelemetry_semantic_conventions::SCHEMA_URL;
38use tansu_client::{Client, ConnectionManager};
39use tansu_otel::meter_provider;
40use tansu_sans_io::{
41    ErrorCode, ProduceRequest,
42    produce_request::{PartitionProduceData, TopicProduceData},
43    record::{deflated, inflated},
44};
45use tansu_schema::{Generator as _, Registry, Schema};
46use tokio::{
47    signal::unix::{SignalKind, signal},
48    task::JoinSet,
49    time::sleep,
50};
51use tokio_util::sync::CancellationToken;
52use tracing::{Instrument, Level, debug, span};
53use url::Url;
54
55pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
56    global::meter_with_scope(
57        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
58            .with_version(env!("CARGO_PKG_VERSION"))
59            .with_schema_url(SCHEMA_URL)
60            .build(),
61    )
62});
63
64pub type Result<T, E = Error> = result::Result<T, E>;
65
66#[derive(thiserror::Error, Debug)]
67pub enum Error {
68    Api(ErrorCode),
69    Client(#[from] tansu_client::Error),
70    ExporterBuild(#[from] ExporterBuildError),
71    InsufficientCapacity(#[from] InsufficientCapacity),
72    Io(Arc<io::Error>),
73    Otel(#[from] tansu_otel::Error),
74    OtelSdk(#[from] OTelSdkError),
75    Poison,
76    Protocol(#[from] tansu_sans_io::Error),
77    Schema(Box<tansu_schema::Error>),
78    SchemaNotFoundForTopic(String),
79    UnknownHost(String),
80    Url(#[from] url::ParseError),
81}
82
83impl<T> From<PoisonError<T>> for Error {
84    fn from(_value: PoisonError<T>) -> Self {
85        Self::Poison
86    }
87}
88
89impl From<tansu_schema::Error> for Error {
90    fn from(error: tansu_schema::Error) -> Self {
91        Self::Schema(Box::new(error))
92    }
93}
94
95impl From<io::Error> for Error {
96    fn from(value: io::Error) -> Self {
97        Self::Io(Arc::new(value))
98    }
99}
100
101impl fmt::Display for Error {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        write!(f, "{self:?}")
104    }
105}
106
107#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
108pub enum CancelKind {
109    Interrupt,
110    Terminate,
111    Timeout,
112}
113
114#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
115pub struct Configuration {
116    broker: Url,
117    topic: String,
118    partition: i32,
119    schema_registry: Url,
120    batch_size: u32,
121    per_second: Option<u32>,
122    producers: u32,
123    duration: Option<Duration>,
124    otlp_endpoint_url: Option<Url>,
125}
126
127#[derive(Clone, Debug)]
128pub struct Generate {
129    configuration: Configuration,
130    registry: Registry,
131}
132
133impl TryFrom<Configuration> for Generate {
134    type Error = Error;
135
136    fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
137        Registry::builder_try_from_url(&configuration.schema_registry)
138            .map(|builder| builder.build())
139            .map(|registry| Self {
140                configuration,
141                registry,
142            })
143            .map_err(Into::into)
144    }
145}
146
147static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
148    METER
149        .u64_histogram("generate_produce_batch_duration")
150        .with_unit("ms")
151        .with_description("Generate a produce batch in milliseconds")
152        .build()
153});
154
155static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
156    METER
157        .u64_histogram("produce_request_response_duration")
158        .with_unit("ms")
159        .with_description("Latency of receiving an produce response in milliseconds")
160        .build()
161});
162
163pub async fn produce(
164    client: Client,
165    name: String,
166    index: i32,
167    schema: Schema,
168    batch_size: i32,
169) -> Result<()> {
170    debug!(?client, %name, index, batch_size);
171
172    let attributes = [
173        KeyValue::new("topic", name.clone()),
174        KeyValue::new("partition", index.to_string()),
175        KeyValue::new("batch_size", batch_size.to_string()),
176    ];
177
178    let frame = {
179        let start = SystemTime::now();
180
181        let mut batch = inflated::Batch::builder();
182        let offset_deltas = 0..batch_size;
183
184        for offset_delta in offset_deltas {
185            batch = schema
186                .generate()
187                .map(|record| record.offset_delta(offset_delta))
188                .map(|record| batch.record(record))?;
189        }
190
191        batch
192            .last_offset_delta(batch_size)
193            .build()
194            .map(|batch| inflated::Frame {
195                batches: vec![batch],
196            })
197            .and_then(deflated::Frame::try_from)
198            .inspect(|_| {
199                GENERATE_PRODUCE_BATCH_DURATION.record(
200                    start
201                        .elapsed()
202                        .map_or(0, |duration| duration.as_millis() as u64),
203                    &attributes,
204                )
205            })?
206    };
207
208    let req = ProduceRequest::default().topic_data(Some(
209        [TopicProduceData::default().name(name).partition_data(Some(
210            [PartitionProduceData::default()
211                .index(index)
212                .records(Some(frame))]
213            .into(),
214        ))]
215        .into(),
216    ));
217
218    let start = SystemTime::now();
219
220    let response = client.call(req).await.inspect(|_| {
221        PRODUCE_REQUEST_RESPONSE_DURATION.record(
222            start
223                .elapsed()
224                .map_or(0, |duration| duration.as_millis() as u64),
225            &attributes,
226        )
227    })?;
228
229    assert!(
230        response
231            .responses
232            .unwrap_or_default()
233            .into_iter()
234            .all(|topic| {
235                topic
236                    .partition_responses
237                    .unwrap_or_default()
238                    .iter()
239                    .inspect(|partition| debug!(topic = %topic.name, ?partition))
240                    .all(|partition| partition.error_code == i16::from(ErrorCode::None))
241            })
242    );
243
244    Ok(())
245}
246
247static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
248    METER
249        .u64_histogram("rate_limit_duration")
250        .with_unit("ms")
251        .with_description("Rate limit latencies in milliseconds")
252        .build()
253});
254
255static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
256    METER
257        .u64_counter("produce_record_count")
258        .with_description("Produced record count")
259        .build()
260});
261
262static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
263    METER
264        .u64_histogram("produce_duration")
265        .with_unit("ms")
266        .with_description("Produce API latencies in milliseconds")
267        .build()
268});
269
270impl Generate {
271    pub async fn main(self) -> Result<ErrorCode> {
272        let meter_provider = self
273            .configuration
274            .otlp_endpoint_url
275            .map_or(Ok(None), |otlp_endpoint_url| {
276                meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
277            })?;
278
279        let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
280            return Err(Error::SchemaNotFoundForTopic(
281                self.configuration.topic.clone(),
282            ));
283        };
284
285        let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
286        debug!(?interrupt_signal);
287
288        let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
289        debug!(?terminate_signal);
290
291        let rate_limiter = self
292            .configuration
293            .per_second
294            .and_then(NonZeroU32::new)
295            .map(Quota::per_second)
296            .map(RateLimiter::direct)
297            .map(Arc::new)
298            .inspect(|rate_limiter| debug!(?rate_limiter));
299
300        let batch_size = NonZeroU32::new(self.configuration.batch_size)
301            .inspect(|batch_size| debug!(batch_size = batch_size.get()))
302            .unwrap_or(nonzero!(10u32));
303
304        let mut set = JoinSet::new();
305
306        let token = CancellationToken::new();
307
308        let client = ConnectionManager::builder(self.configuration.broker)
309            .client_id(Some(env!("CARGO_PKG_NAME").into()))
310            .build()
311            .await
312            .inspect(|pool| debug!(?pool))
313            .map(Client::new)?;
314
315        for producer in 0..self.configuration.producers {
316            let rate_limiter = rate_limiter.clone();
317            let schema = schema.clone();
318            let topic = self.configuration.topic.clone();
319            let partition = self.configuration.partition;
320            let token = token.clone();
321            let client = client.clone();
322
323            _ = set.spawn(async move {
324                    let span = span!(Level::DEBUG, "producer", producer);
325
326                    async move {
327                        let attributes = [KeyValue::new("producer", producer.to_string())];
328
329                        loop {
330                            debug!(%topic, partition);
331
332                            if let Some(ref rate_limiter) = rate_limiter {
333                                let rate_limit_start = SystemTime::now();
334
335                                tokio::select! {
336                                    cancelled = token.cancelled() => {
337                                        debug!(?cancelled);
338                                        break
339                                    },
340
341                                    Ok(_) = rate_limiter.until_n_ready_with_jitter(batch_size, Jitter::up_to(Duration::from_millis(50))) => {
342                                        RATE_LIMIT_DURATION.record(
343                                        rate_limit_start
344                                            .elapsed()
345                                            .map_or(0, |duration| duration.as_millis() as u64),
346                                            &attributes)
347
348                                    },
349                                }
350                            }
351
352                            let produce_start = SystemTime::now();
353
354                            tokio::select! {
355                                cancelled = token.cancelled() => {
356                                    debug!(?cancelled);
357                                    break
358                                },
359
360                                Ok(_) = produce(client.clone(), topic.clone(), partition, schema.clone(), batch_size.get() as i32) => {
361                                    PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
362                                    PRODUCE_API_DURATION.record(produce_start.elapsed().map_or(0, |duration| duration.as_millis() as u64), &attributes);
363                                },
364                            }
365                        }
366
367                    }.instrument(span).await
368
369                });
370        }
371
372        let join_all = async {
373            while !set.is_empty() {
374                debug!(len = set.len());
375                _ = set.join_next().await;
376            }
377        };
378
379        let duration = self
380            .configuration
381            .duration
382            .map(sleep)
383            .map(Box::pin)
384            .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
385            .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
386
387        let cancellation = tokio::select! {
388
389            timeout = duration => {
390                debug!(?timeout);
391                token.cancel();
392                Some(CancelKind::Timeout)
393            }
394
395            completed = join_all => {
396                debug!(?completed);
397                None
398            }
399
400            interrupt = interrupt_signal.recv() => {
401                debug!(?interrupt);
402                Some(CancelKind::Interrupt)
403            }
404
405            terminate = terminate_signal.recv() => {
406                debug!(?terminate);
407                Some(CancelKind::Terminate)
408            }
409
410        };
411
412        debug!(?cancellation);
413
414        if let Some(meter_provider) = meter_provider {
415            meter_provider
416                .force_flush()
417                .inspect(|force_flush| debug!(?force_flush))?;
418
419            meter_provider
420                .shutdown()
421                .inspect(|shutdown| debug!(?shutdown))?;
422        }
423
424        if let Some(CancelKind::Timeout) = cancellation {
425            sleep(Duration::from_secs(5)).await;
426        }
427
428        debug!(abort = set.len());
429        set.abort_all();
430
431        while !set.is_empty() {
432            _ = set.join_next().await;
433        }
434
435        Ok(ErrorCode::None)
436    }
437
438    pub fn builder()
439    -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
440        Builder::default()
441    }
442}
443
444#[derive(Clone, Debug)]
445pub struct Builder<B, T, P, S> {
446    broker: B,
447    topic: T,
448    partition: P,
449    schema_registry: S,
450    batch_size: u32,
451    per_second: Option<u32>,
452    producers: u32,
453    duration: Option<Duration>,
454    otlp_endpoint_url: Option<Url>,
455}
456
457impl Default
458    for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
459{
460    fn default() -> Self {
461        Self {
462            broker: Default::default(),
463            topic: Default::default(),
464            partition: Default::default(),
465            schema_registry: Default::default(),
466            batch_size: 1,
467            per_second: None,
468            producers: 1,
469            duration: None,
470            otlp_endpoint_url: None,
471        }
472    }
473}
474
475impl<B, T, P, S> Builder<B, T, P, S> {
476    pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
477        Builder {
478            broker: broker.into(),
479            topic: self.topic,
480            partition: self.partition,
481            schema_registry: self.schema_registry,
482            batch_size: self.batch_size,
483            per_second: self.per_second,
484            producers: self.producers,
485            duration: self.duration,
486            otlp_endpoint_url: self.otlp_endpoint_url,
487        }
488    }
489
490    pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
491        Builder {
492            broker: self.broker,
493            topic: topic.into(),
494            partition: self.partition,
495            schema_registry: self.schema_registry,
496            batch_size: self.batch_size,
497            per_second: self.per_second,
498            producers: self.producers,
499            duration: self.duration,
500            otlp_endpoint_url: self.otlp_endpoint_url,
501        }
502    }
503
504    pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
505        Builder {
506            broker: self.broker,
507            topic: self.topic,
508            partition,
509            schema_registry: self.schema_registry,
510            batch_size: self.batch_size,
511            per_second: self.per_second,
512            producers: self.producers,
513            duration: self.duration,
514            otlp_endpoint_url: self.otlp_endpoint_url,
515        }
516    }
517
518    pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
519        Builder {
520            broker: self.broker,
521            topic: self.topic,
522            partition: self.partition,
523            schema_registry,
524            batch_size: self.batch_size,
525            per_second: self.per_second,
526            producers: self.producers,
527            duration: self.duration,
528            otlp_endpoint_url: self.otlp_endpoint_url,
529        }
530    }
531
532    pub fn batch_size(self, batch_size: u32) -> Builder<B, T, P, S> {
533        Builder {
534            broker: self.broker,
535            topic: self.topic,
536            partition: self.partition,
537            schema_registry: self.schema_registry,
538            batch_size,
539            per_second: self.per_second,
540            producers: self.producers,
541            duration: self.duration,
542            otlp_endpoint_url: self.otlp_endpoint_url,
543        }
544    }
545
546    pub fn per_second(self, per_second: Option<u32>) -> Builder<B, T, P, S> {
547        Builder {
548            broker: self.broker,
549            topic: self.topic,
550            partition: self.partition,
551            schema_registry: self.schema_registry,
552            batch_size: self.batch_size,
553            per_second,
554            producers: self.producers,
555            duration: self.duration,
556            otlp_endpoint_url: self.otlp_endpoint_url,
557        }
558    }
559
560    pub fn producers(self, producers: u32) -> Builder<B, T, P, S> {
561        Builder {
562            broker: self.broker,
563            topic: self.topic,
564            partition: self.partition,
565            schema_registry: self.schema_registry,
566            batch_size: self.batch_size,
567            per_second: self.per_second,
568            producers,
569            duration: self.duration,
570            otlp_endpoint_url: self.otlp_endpoint_url,
571        }
572    }
573
574    pub fn duration(self, duration: Option<Duration>) -> Builder<B, T, P, S> {
575        Builder {
576            broker: self.broker,
577            topic: self.topic,
578            partition: self.partition,
579            schema_registry: self.schema_registry,
580            batch_size: self.batch_size,
581            per_second: self.per_second,
582            producers: self.producers,
583            duration,
584            otlp_endpoint_url: self.otlp_endpoint_url,
585        }
586    }
587
588    pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Builder<B, T, P, S> {
589        Builder {
590            broker: self.broker,
591            topic: self.topic,
592            partition: self.partition,
593            schema_registry: self.schema_registry,
594            batch_size: self.batch_size,
595            per_second: self.per_second,
596            producers: self.producers,
597            duration: self.duration,
598            otlp_endpoint_url,
599        }
600    }
601}
602
603impl Builder<Url, String, i32, Url> {
604    pub fn build(self) -> Result<Generate> {
605        Generate::try_from(Configuration {
606            broker: self.broker,
607            topic: self.topic,
608            partition: self.partition,
609            schema_registry: self.schema_registry,
610            batch_size: self.batch_size,
611            per_second: self.per_second,
612            producers: self.producers,
613            duration: self.duration,
614            otlp_endpoint_url: self.otlp_endpoint_url,
615        })
616    }
617}