tansu_generator/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Tansu Message Generator
16//!
17//! Generate fake data for schema backed topics.
18
19use std::{
20    fmt, io,
21    marker::PhantomData,
22    net::SocketAddr,
23    num::NonZeroU32,
24    pin::Pin,
25    result,
26    sync::{Arc, LazyLock, PoisonError},
27    time::{Duration, SystemTime},
28};
29
30use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
31use nonzero_ext::nonzero;
32use opentelemetry::{
33    InstrumentationScope, KeyValue, global,
34    metrics::{Counter, Histogram, Meter},
35};
36use opentelemetry_otlp::ExporterBuildError;
37use opentelemetry_sdk::error::OTelSdkError;
38use opentelemetry_semantic_conventions::SCHEMA_URL;
39use rama::{
40    Context, Service,
41    error::{BoxError, OpaqueError},
42};
43use tansu_otel::meter_provider;
44use tansu_sans_io::{
45    ErrorCode,
46    produce_request::{PartitionProduceData, TopicProduceData},
47    record::{deflated, inflated},
48};
49use tansu_schema::{Generator as _, Registry, Schema};
50use tansu_service::{
51    api::produce::{ProduceRequest, ProduceResponse},
52    service::ApiClient,
53};
54use tokio::{
55    net::lookup_host,
56    signal::unix::{SignalKind, signal},
57    task::JoinSet,
58    time::sleep,
59};
60use tokio_util::sync::CancellationToken;
61use tracing::{Instrument, Level, debug, span};
62use url::Url;
63
64pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
65    global::meter_with_scope(
66        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
67            .with_version(env!("CARGO_PKG_VERSION"))
68            .with_schema_url(SCHEMA_URL)
69            .build(),
70    )
71});
72
73pub type Result<T, E = Error> = result::Result<T, E>;
74
75#[derive(thiserror::Error, Debug)]
76pub enum Error {
77    Api(ErrorCode),
78    Box(#[from] BoxError),
79    ExporterBuild(#[from] ExporterBuildError),
80    InsufficientCapacity(#[from] InsufficientCapacity),
81    Io(Arc<io::Error>),
82    Opaque(#[from] OpaqueError),
83    Otel(#[from] tansu_otel::Error),
84    OtelSdk(#[from] OTelSdkError),
85    Poison,
86    Protocol(#[from] tansu_sans_io::Error),
87    Schema(Box<tansu_schema::Error>),
88    SchemaNotFoundForTopic(String),
89    UnknownHost(String),
90    Url(#[from] url::ParseError),
91}
92
93impl<T> From<PoisonError<T>> for Error {
94    fn from(_value: PoisonError<T>) -> Self {
95        Self::Poison
96    }
97}
98
99impl From<tansu_schema::Error> for Error {
100    fn from(error: tansu_schema::Error) -> Self {
101        Self::Schema(Box::new(error))
102    }
103}
104
105impl From<io::Error> for Error {
106    fn from(value: io::Error) -> Self {
107        Self::Io(Arc::new(value))
108    }
109}
110
111impl fmt::Display for Error {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        write!(f, "{self:?}")
114    }
115}
116
117#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
118pub enum CancelKind {
119    Interrupt,
120    Terminate,
121    Timeout,
122}
123
124#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
125pub struct Configuration {
126    broker: Url,
127    topic: String,
128    partition: i32,
129    schema_registry: Url,
130    batch_size: u32,
131    per_second: Option<u32>,
132    producers: u32,
133    duration: Option<Duration>,
134    otlp_endpoint_url: Option<Url>,
135}
136
137#[derive(Clone, Debug)]
138pub struct Generate {
139    configuration: Configuration,
140    registry: Registry,
141}
142
143impl TryFrom<Configuration> for Generate {
144    type Error = Error;
145
146    fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
147        Registry::try_from(&configuration.schema_registry)
148            .map(|registry| Self {
149                configuration,
150                registry,
151            })
152            .map_err(Into::into)
153    }
154}
155
156static DNS_LOOKUP_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
157    METER
158        .u64_histogram("dns_lookup_duration")
159        .with_unit("ms")
160        .with_description("DNS lookup latencies")
161        .build()
162});
163
164async fn host_port(url: &Url) -> Result<SocketAddr> {
165    if let Some(host) = url.host_str()
166        && let Some(port) = url.port()
167    {
168        let attributes = [KeyValue::new("url", url.to_string())];
169        let start = SystemTime::now();
170
171        let mut addresses = lookup_host(format!("{host}:{port}"))
172            .await
173            .inspect(|_| {
174                DNS_LOOKUP_DURATION.record(
175                    start
176                        .elapsed()
177                        .map_or(0, |duration| duration.as_millis() as u64),
178                    &attributes,
179                )
180            })?
181            .filter(|socket_addr| matches!(socket_addr, SocketAddr::V4(_)));
182
183        if let Some(socket_addr) = addresses.next().inspect(|socket_addr| debug!(?socket_addr)) {
184            return Ok(socket_addr);
185        }
186    }
187
188    Err(Error::UnknownHost(url.to_string()))
189}
190
191static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
192    METER
193        .u64_histogram("generate_produce_batch_duration")
194        .with_unit("ms")
195        .with_description("Generate a produce batch in milliseconds")
196        .build()
197});
198
199static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
200    METER
201        .u64_histogram("produce_request_response_duration")
202        .with_unit("ms")
203        .with_description("Latency of receiving an produce response in milliseconds")
204        .build()
205});
206
207pub async fn produce(
208    broker: Url,
209    name: String,
210    index: i32,
211    schema: Schema,
212    batch_size: i32,
213) -> Result<()> {
214    debug!(%broker, %name, index, batch_size);
215
216    let attributes = [
217        KeyValue::new("topic", name.clone()),
218        KeyValue::new("partition", index.to_string()),
219        KeyValue::new("batch_size", batch_size.to_string()),
220    ];
221
222    let frame = {
223        let start = SystemTime::now();
224
225        let mut batch = inflated::Batch::builder();
226        let offset_deltas = 0..batch_size;
227
228        for offset_delta in offset_deltas {
229            batch = schema
230                .generate()
231                .map(|record| record.offset_delta(offset_delta))
232                .map(|record| batch.record(record))?;
233        }
234
235        batch
236            .last_offset_delta(batch_size)
237            .build()
238            .map(|batch| inflated::Frame {
239                batches: vec![batch],
240            })
241            .and_then(deflated::Frame::try_from)
242            .inspect(|_| {
243                GENERATE_PRODUCE_BATCH_DURATION.record(
244                    start
245                        .elapsed()
246                        .map_or(0, |duration| duration.as_millis() as u64),
247                    &attributes,
248                )
249            })?
250    };
251
252    let request = ProduceRequest {
253        topic_data: Some(
254            [TopicProduceData::default().name(name).partition_data(Some(
255                [PartitionProduceData::default()
256                    .index(index)
257                    .records(Some(frame))]
258                .into(),
259            ))]
260            .into(),
261        ),
262        ..Default::default()
263    };
264
265    let origin = host_port(&broker)
266        .await
267        .map(Into::into)
268        .map(ApiClient::new)
269        .inspect(|client| debug!(?client))?;
270
271    let response = {
272        let start = SystemTime::now();
273
274        let attributes = [];
275
276        origin
277            .serve(Context::default(), request.into())
278            .await
279            .and_then(|response| ProduceResponse::try_from(response).map_err(Into::into))
280            .inspect(|response| debug!(?response))
281            .inspect(|_| {
282                PRODUCE_REQUEST_RESPONSE_DURATION.record(
283                    start
284                        .elapsed()
285                        .map_or(0, |duration| duration.as_millis() as u64),
286                    &attributes,
287                )
288            })
289    }?;
290
291    assert!(
292        response
293            .responses
294            .unwrap_or_default()
295            .into_iter()
296            .all(|topic| {
297                topic
298                    .partition_responses
299                    .unwrap_or_default()
300                    .iter()
301                    .inspect(|partition| debug!(topic = %topic.name, ?partition))
302                    .all(|partition| partition.error_code == i16::from(ErrorCode::None))
303            })
304    );
305
306    Ok(())
307}
308
309static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
310    METER
311        .u64_histogram("rate_limit_duration")
312        .with_unit("ms")
313        .with_description("Rate limit latencies in milliseconds")
314        .build()
315});
316
317static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
318    METER
319        .u64_counter("produce_record_count")
320        .with_description("Produced record count")
321        .build()
322});
323
324static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
325    METER
326        .u64_histogram("produce_duration")
327        .with_unit("ms")
328        .with_description("Produce API latencies in milliseconds")
329        .build()
330});
331
332impl Generate {
333    pub async fn main(self) -> Result<ErrorCode> {
334        let meter_provider = self
335            .configuration
336            .otlp_endpoint_url
337            .map_or(Ok(None), |otlp_endpoint_url| {
338                meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
339            })?;
340
341        let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
342            return Err(Error::SchemaNotFoundForTopic(
343                self.configuration.topic.clone(),
344            ));
345        };
346
347        let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
348        debug!(?interrupt_signal);
349
350        let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
351        debug!(?terminate_signal);
352
353        let rate_limiter = self
354            .configuration
355            .per_second
356            .and_then(NonZeroU32::new)
357            .map(Quota::per_second)
358            .map(RateLimiter::direct)
359            .map(Arc::new)
360            .inspect(|rate_limiter| debug!(?rate_limiter));
361
362        let batch_size = NonZeroU32::new(self.configuration.batch_size)
363            .inspect(|batch_size| debug!(batch_size = batch_size.get()))
364            .unwrap_or(nonzero!(10u32));
365
366        let mut set = JoinSet::new();
367
368        let token = CancellationToken::new();
369
370        for producer in 0..self.configuration.producers {
371            let rate_limiter = rate_limiter.clone();
372            let schema = schema.clone();
373            let broker = self.configuration.broker.clone();
374            let topic = self.configuration.topic.clone();
375            let partition = self.configuration.partition;
376            let token = token.clone();
377
378            _ = set.spawn(async move {
379                    let span = span!(Level::DEBUG, "producer", producer);
380
381
382                    async move {
383                        let attributes = [KeyValue::new("producer", producer.to_string())];
384
385                        loop {
386                            debug!(%broker, %topic, partition);
387
388                            if let Some(ref rate_limiter) = rate_limiter {
389                                let rate_limit_start = SystemTime::now();
390
391
392
393
394
395                                tokio::select! {
396                                    cancelled = token.cancelled() => {
397                                        debug!(?cancelled);
398                                        break
399                                    },
400
401                                    Ok(_) = rate_limiter.until_n_ready_with_jitter(batch_size, Jitter::up_to(Duration::from_millis(50))) => {
402                                        RATE_LIMIT_DURATION.record(
403                                        rate_limit_start
404                                            .elapsed()
405                                            .map_or(0, |duration| duration.as_millis() as u64),
406                                            &attributes)
407
408                                    },
409                                }
410                            }
411
412                            let produce_start = SystemTime::now();
413
414                            tokio::select! {
415                                cancelled = token.cancelled() => {
416                                    debug!(?cancelled);
417                                    break
418                                },
419
420                                Ok(_) = produce(broker.clone(), topic.clone(), partition, schema.clone(), batch_size.get() as i32) => {
421                                    PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
422                                    PRODUCE_API_DURATION.record(produce_start.elapsed().map_or(0, |duration| duration.as_millis() as u64), &attributes);
423                                },
424                            }
425                        }
426
427                    }.instrument(span).await
428
429                });
430        }
431
432        let join_all = async {
433            while !set.is_empty() {
434                debug!(len = set.len());
435                _ = set.join_next().await;
436            }
437        };
438
439        let duration = self
440            .configuration
441            .duration
442            .map(sleep)
443            .map(Box::pin)
444            .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
445            .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
446
447        let cancellation = tokio::select! {
448
449            timeout = duration => {
450                debug!(?timeout);
451                token.cancel();
452                Some(CancelKind::Timeout)
453            }
454
455            completed = join_all => {
456                debug!(?completed);
457                None
458            }
459
460            interrupt = interrupt_signal.recv() => {
461                debug!(?interrupt);
462                Some(CancelKind::Interrupt)
463            }
464
465            terminate = terminate_signal.recv() => {
466                debug!(?terminate);
467                Some(CancelKind::Terminate)
468            }
469
470        };
471
472        debug!(?cancellation);
473
474        if let Some(meter_provider) = meter_provider {
475            meter_provider
476                .force_flush()
477                .inspect(|force_flush| debug!(?force_flush))?;
478
479            meter_provider
480                .shutdown()
481                .inspect(|shutdown| debug!(?shutdown))?;
482        }
483
484        if let Some(CancelKind::Timeout) = cancellation {
485            sleep(Duration::from_secs(5)).await;
486        }
487
488        debug!(abort = set.len());
489        set.abort_all();
490
491        while !set.is_empty() {
492            _ = set.join_next().await;
493        }
494
495        Ok(ErrorCode::None)
496    }
497
498    pub fn builder()
499    -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
500        Builder::default()
501    }
502}
503
504#[derive(Clone, Debug)]
505pub struct Builder<B, T, P, S> {
506    broker: B,
507    topic: T,
508    partition: P,
509    schema_registry: S,
510    batch_size: u32,
511    per_second: Option<u32>,
512    producers: u32,
513    duration: Option<Duration>,
514    otlp_endpoint_url: Option<Url>,
515}
516
517impl Default
518    for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
519{
520    fn default() -> Self {
521        Self {
522            broker: Default::default(),
523            topic: Default::default(),
524            partition: Default::default(),
525            schema_registry: Default::default(),
526            batch_size: 1,
527            per_second: None,
528            producers: 1,
529            duration: None,
530            otlp_endpoint_url: None,
531        }
532    }
533}
534
535impl<B, T, P, S> Builder<B, T, P, S> {
536    pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
537        Builder {
538            broker: broker.into(),
539            topic: self.topic,
540            partition: self.partition,
541            schema_registry: self.schema_registry,
542            batch_size: self.batch_size,
543            per_second: self.per_second,
544            producers: self.producers,
545            duration: self.duration,
546            otlp_endpoint_url: self.otlp_endpoint_url,
547        }
548    }
549
550    pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
551        Builder {
552            broker: self.broker,
553            topic: topic.into(),
554            partition: self.partition,
555            schema_registry: self.schema_registry,
556            batch_size: self.batch_size,
557            per_second: self.per_second,
558            producers: self.producers,
559            duration: self.duration,
560            otlp_endpoint_url: self.otlp_endpoint_url,
561        }
562    }
563
564    pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
565        Builder {
566            broker: self.broker,
567            topic: self.topic,
568            partition,
569            schema_registry: self.schema_registry,
570            batch_size: self.batch_size,
571            per_second: self.per_second,
572            producers: self.producers,
573            duration: self.duration,
574            otlp_endpoint_url: self.otlp_endpoint_url,
575        }
576    }
577
578    pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
579        Builder {
580            broker: self.broker,
581            topic: self.topic,
582            partition: self.partition,
583            schema_registry,
584            batch_size: self.batch_size,
585            per_second: self.per_second,
586            producers: self.producers,
587            duration: self.duration,
588            otlp_endpoint_url: self.otlp_endpoint_url,
589        }
590    }
591
592    pub fn batch_size(self, batch_size: u32) -> Builder<B, T, P, S> {
593        Builder {
594            broker: self.broker,
595            topic: self.topic,
596            partition: self.partition,
597            schema_registry: self.schema_registry,
598            batch_size,
599            per_second: self.per_second,
600            producers: self.producers,
601            duration: self.duration,
602            otlp_endpoint_url: self.otlp_endpoint_url,
603        }
604    }
605
606    pub fn per_second(self, per_second: Option<u32>) -> Builder<B, T, P, S> {
607        Builder {
608            broker: self.broker,
609            topic: self.topic,
610            partition: self.partition,
611            schema_registry: self.schema_registry,
612            batch_size: self.batch_size,
613            per_second,
614            producers: self.producers,
615            duration: self.duration,
616            otlp_endpoint_url: self.otlp_endpoint_url,
617        }
618    }
619
620    pub fn producers(self, producers: u32) -> Builder<B, T, P, S> {
621        Builder {
622            broker: self.broker,
623            topic: self.topic,
624            partition: self.partition,
625            schema_registry: self.schema_registry,
626            batch_size: self.batch_size,
627            per_second: self.per_second,
628            producers,
629            duration: self.duration,
630            otlp_endpoint_url: self.otlp_endpoint_url,
631        }
632    }
633
634    pub fn duration(self, duration: Option<Duration>) -> Builder<B, T, P, S> {
635        Builder {
636            broker: self.broker,
637            topic: self.topic,
638            partition: self.partition,
639            schema_registry: self.schema_registry,
640            batch_size: self.batch_size,
641            per_second: self.per_second,
642            producers: self.producers,
643            duration,
644            otlp_endpoint_url: self.otlp_endpoint_url,
645        }
646    }
647
648    pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Builder<B, T, P, S> {
649        Builder {
650            broker: self.broker,
651            topic: self.topic,
652            partition: self.partition,
653            schema_registry: self.schema_registry,
654            batch_size: self.batch_size,
655            per_second: self.per_second,
656            producers: self.producers,
657            duration: self.duration,
658            otlp_endpoint_url,
659        }
660    }
661}
662
663impl Builder<Url, String, i32, Url> {
664    pub fn build(self) -> Result<Generate> {
665        Generate::try_from(Configuration {
666            broker: self.broker,
667            topic: self.topic,
668            partition: self.partition,
669            schema_registry: self.schema_registry,
670            batch_size: self.batch_size,
671            per_second: self.per_second,
672            producers: self.producers,
673            duration: self.duration,
674            otlp_endpoint_url: self.otlp_endpoint_url,
675        })
676    }
677}