1use std::{
20 fmt, io,
21 marker::PhantomData,
22 num::NonZeroU32,
23 pin::Pin,
24 result,
25 sync::{Arc, LazyLock, PoisonError},
26 time::{Duration, SystemTime},
27};
28
29use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
30use nonzero_ext::nonzero;
31use opentelemetry::{
32 InstrumentationScope, KeyValue, global,
33 metrics::{Counter, Histogram, Meter},
34};
35use opentelemetry_otlp::ExporterBuildError;
36use opentelemetry_sdk::error::OTelSdkError;
37use opentelemetry_semantic_conventions::SCHEMA_URL;
38use tansu_client::{Client, ConnectionManager};
39use tansu_otel::meter_provider;
40use tansu_sans_io::{
41 ErrorCode, ProduceRequest,
42 produce_request::{PartitionProduceData, TopicProduceData},
43 record::{deflated, inflated},
44};
45use tansu_schema::{Generator as _, Registry, Schema};
46use tokio::{
47 signal::unix::{SignalKind, signal},
48 task::JoinSet,
49 time::sleep,
50};
51use tokio_util::sync::CancellationToken;
52use tracing::{Instrument, Level, debug, span};
53use url::Url;
54
55pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
56 global::meter_with_scope(
57 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
58 .with_version(env!("CARGO_PKG_VERSION"))
59 .with_schema_url(SCHEMA_URL)
60 .build(),
61 )
62});
63
64pub type Result<T, E = Error> = result::Result<T, E>;
65
66#[derive(thiserror::Error, Debug)]
67pub enum Error {
68 Api(ErrorCode),
69 Client(#[from] tansu_client::Error),
70 ExporterBuild(#[from] ExporterBuildError),
71 InsufficientCapacity(#[from] InsufficientCapacity),
72 Io(Arc<io::Error>),
73 Otel(#[from] tansu_otel::Error),
74 OtelSdk(#[from] OTelSdkError),
75 Poison,
76 Protocol(#[from] tansu_sans_io::Error),
77 Schema(Box<tansu_schema::Error>),
78 SchemaNotFoundForTopic(String),
79 UnknownHost(String),
80 Url(#[from] url::ParseError),
81}
82
83impl<T> From<PoisonError<T>> for Error {
84 fn from(_value: PoisonError<T>) -> Self {
85 Self::Poison
86 }
87}
88
89impl From<tansu_schema::Error> for Error {
90 fn from(error: tansu_schema::Error) -> Self {
91 Self::Schema(Box::new(error))
92 }
93}
94
95impl From<io::Error> for Error {
96 fn from(value: io::Error) -> Self {
97 Self::Io(Arc::new(value))
98 }
99}
100
101impl fmt::Display for Error {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 write!(f, "{self:?}")
104 }
105}
106
107#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
108pub enum CancelKind {
109 Interrupt,
110 Terminate,
111 Timeout,
112}
113
114#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
115pub struct Configuration {
116 broker: Url,
117 topic: String,
118 partition: i32,
119 schema_registry: Url,
120 batch_size: u32,
121 per_second: Option<u32>,
122 producers: u32,
123 duration: Option<Duration>,
124 otlp_endpoint_url: Option<Url>,
125}
126
127#[derive(Clone, Debug)]
128pub struct Generate {
129 configuration: Configuration,
130 registry: Registry,
131}
132
133impl TryFrom<Configuration> for Generate {
134 type Error = Error;
135
136 fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
137 Registry::builder_try_from_url(&configuration.schema_registry)
138 .map(|builder| builder.build())
139 .map(|registry| Self {
140 configuration,
141 registry,
142 })
143 .map_err(Into::into)
144 }
145}
146
147static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
148 METER
149 .u64_histogram("generate_produce_batch_duration")
150 .with_unit("ms")
151 .with_description("Generate a produce batch in milliseconds")
152 .build()
153});
154
155static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
156 METER
157 .u64_histogram("produce_request_response_duration")
158 .with_unit("ms")
159 .with_description("Latency of receiving an produce response in milliseconds")
160 .build()
161});
162
163pub async fn produce(
164 client: Client,
165 name: String,
166 index: i32,
167 schema: Schema,
168 batch_size: i32,
169) -> Result<()> {
170 debug!(?client, %name, index, batch_size);
171
172 let attributes = [
173 KeyValue::new("topic", name.clone()),
174 KeyValue::new("partition", index.to_string()),
175 KeyValue::new("batch_size", batch_size.to_string()),
176 ];
177
178 let frame = {
179 let start = SystemTime::now();
180
181 let mut batch = inflated::Batch::builder();
182 let offset_deltas = 0..batch_size;
183
184 for offset_delta in offset_deltas {
185 batch = schema
186 .generate()
187 .map(|record| record.offset_delta(offset_delta))
188 .map(|record| batch.record(record))?;
189 }
190
191 batch
192 .last_offset_delta(batch_size)
193 .build()
194 .map(|batch| inflated::Frame {
195 batches: vec![batch],
196 })
197 .and_then(deflated::Frame::try_from)
198 .inspect(|_| {
199 GENERATE_PRODUCE_BATCH_DURATION.record(
200 start
201 .elapsed()
202 .map_or(0, |duration| duration.as_millis() as u64),
203 &attributes,
204 )
205 })?
206 };
207
208 let req = ProduceRequest::default().topic_data(Some(
209 [TopicProduceData::default().name(name).partition_data(Some(
210 [PartitionProduceData::default()
211 .index(index)
212 .records(Some(frame))]
213 .into(),
214 ))]
215 .into(),
216 ));
217
218 let start = SystemTime::now();
219
220 let response = client.call(req).await.inspect(|_| {
221 PRODUCE_REQUEST_RESPONSE_DURATION.record(
222 start
223 .elapsed()
224 .map_or(0, |duration| duration.as_millis() as u64),
225 &attributes,
226 )
227 })?;
228
229 assert!(
230 response
231 .responses
232 .unwrap_or_default()
233 .into_iter()
234 .all(|topic| {
235 topic
236 .partition_responses
237 .unwrap_or_default()
238 .iter()
239 .inspect(|partition| debug!(topic = %topic.name, ?partition))
240 .all(|partition| partition.error_code == i16::from(ErrorCode::None))
241 })
242 );
243
244 Ok(())
245}
246
247static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
248 METER
249 .u64_histogram("rate_limit_duration")
250 .with_unit("ms")
251 .with_description("Rate limit latencies in milliseconds")
252 .build()
253});
254
255static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
256 METER
257 .u64_counter("produce_record_count")
258 .with_description("Produced record count")
259 .build()
260});
261
262static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
263 METER
264 .u64_histogram("produce_duration")
265 .with_unit("ms")
266 .with_description("Produce API latencies in milliseconds")
267 .build()
268});
269
270impl Generate {
271 pub async fn main(self) -> Result<ErrorCode> {
272 let meter_provider = self
273 .configuration
274 .otlp_endpoint_url
275 .map_or(Ok(None), |otlp_endpoint_url| {
276 meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
277 })?;
278
279 let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
280 return Err(Error::SchemaNotFoundForTopic(
281 self.configuration.topic.clone(),
282 ));
283 };
284
285 let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
286 debug!(?interrupt_signal);
287
288 let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
289 debug!(?terminate_signal);
290
291 let rate_limiter = self
292 .configuration
293 .per_second
294 .and_then(NonZeroU32::new)
295 .map(Quota::per_second)
296 .map(RateLimiter::direct)
297 .map(Arc::new)
298 .inspect(|rate_limiter| debug!(?rate_limiter));
299
300 let batch_size = NonZeroU32::new(self.configuration.batch_size)
301 .inspect(|batch_size| debug!(batch_size = batch_size.get()))
302 .unwrap_or(nonzero!(10u32));
303
304 let mut set = JoinSet::new();
305
306 let token = CancellationToken::new();
307
308 let client = ConnectionManager::builder(self.configuration.broker)
309 .client_id(Some(env!("CARGO_PKG_NAME").into()))
310 .build()
311 .await
312 .inspect(|pool| debug!(?pool))
313 .map(Client::new)?;
314
315 for producer in 0..self.configuration.producers {
316 let rate_limiter = rate_limiter.clone();
317 let schema = schema.clone();
318 let topic = self.configuration.topic.clone();
319 let partition = self.configuration.partition;
320 let token = token.clone();
321 let client = client.clone();
322
323 _ = set.spawn(async move {
324 let span = span!(Level::DEBUG, "producer", producer);
325
326 async move {
327 let attributes = [KeyValue::new("producer", producer.to_string())];
328
329 loop {
330 debug!(%topic, partition);
331
332 if let Some(ref rate_limiter) = rate_limiter {
333 let rate_limit_start = SystemTime::now();
334
335 tokio::select! {
336 cancelled = token.cancelled() => {
337 debug!(?cancelled);
338 break
339 },
340
341 Ok(_) = rate_limiter.until_n_ready_with_jitter(batch_size, Jitter::up_to(Duration::from_millis(50))) => {
342 RATE_LIMIT_DURATION.record(
343 rate_limit_start
344 .elapsed()
345 .map_or(0, |duration| duration.as_millis() as u64),
346 &attributes)
347
348 },
349 }
350 }
351
352 let produce_start = SystemTime::now();
353
354 tokio::select! {
355 cancelled = token.cancelled() => {
356 debug!(?cancelled);
357 break
358 },
359
360 Ok(_) = produce(client.clone(), topic.clone(), partition, schema.clone(), batch_size.get() as i32) => {
361 PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
362 PRODUCE_API_DURATION.record(produce_start.elapsed().map_or(0, |duration| duration.as_millis() as u64), &attributes);
363 },
364 }
365 }
366
367 }.instrument(span).await
368
369 });
370 }
371
372 let join_all = async {
373 while !set.is_empty() {
374 debug!(len = set.len());
375 _ = set.join_next().await;
376 }
377 };
378
379 let duration = self
380 .configuration
381 .duration
382 .map(sleep)
383 .map(Box::pin)
384 .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
385 .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
386
387 let cancellation = tokio::select! {
388
389 timeout = duration => {
390 debug!(?timeout);
391 token.cancel();
392 Some(CancelKind::Timeout)
393 }
394
395 completed = join_all => {
396 debug!(?completed);
397 None
398 }
399
400 interrupt = interrupt_signal.recv() => {
401 debug!(?interrupt);
402 Some(CancelKind::Interrupt)
403 }
404
405 terminate = terminate_signal.recv() => {
406 debug!(?terminate);
407 Some(CancelKind::Terminate)
408 }
409
410 };
411
412 debug!(?cancellation);
413
414 if let Some(meter_provider) = meter_provider {
415 meter_provider
416 .force_flush()
417 .inspect(|force_flush| debug!(?force_flush))?;
418
419 meter_provider
420 .shutdown()
421 .inspect(|shutdown| debug!(?shutdown))?;
422 }
423
424 if let Some(CancelKind::Timeout) = cancellation {
425 sleep(Duration::from_secs(5)).await;
426 }
427
428 debug!(abort = set.len());
429 set.abort_all();
430
431 while !set.is_empty() {
432 _ = set.join_next().await;
433 }
434
435 Ok(ErrorCode::None)
436 }
437
438 pub fn builder()
439 -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
440 Builder::default()
441 }
442}
443
444#[derive(Clone, Debug)]
445pub struct Builder<B, T, P, S> {
446 broker: B,
447 topic: T,
448 partition: P,
449 schema_registry: S,
450 batch_size: u32,
451 per_second: Option<u32>,
452 producers: u32,
453 duration: Option<Duration>,
454 otlp_endpoint_url: Option<Url>,
455}
456
457impl Default
458 for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
459{
460 fn default() -> Self {
461 Self {
462 broker: Default::default(),
463 topic: Default::default(),
464 partition: Default::default(),
465 schema_registry: Default::default(),
466 batch_size: 1,
467 per_second: None,
468 producers: 1,
469 duration: None,
470 otlp_endpoint_url: None,
471 }
472 }
473}
474
475impl<B, T, P, S> Builder<B, T, P, S> {
476 pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
477 Builder {
478 broker: broker.into(),
479 topic: self.topic,
480 partition: self.partition,
481 schema_registry: self.schema_registry,
482 batch_size: self.batch_size,
483 per_second: self.per_second,
484 producers: self.producers,
485 duration: self.duration,
486 otlp_endpoint_url: self.otlp_endpoint_url,
487 }
488 }
489
490 pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
491 Builder {
492 broker: self.broker,
493 topic: topic.into(),
494 partition: self.partition,
495 schema_registry: self.schema_registry,
496 batch_size: self.batch_size,
497 per_second: self.per_second,
498 producers: self.producers,
499 duration: self.duration,
500 otlp_endpoint_url: self.otlp_endpoint_url,
501 }
502 }
503
504 pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
505 Builder {
506 broker: self.broker,
507 topic: self.topic,
508 partition,
509 schema_registry: self.schema_registry,
510 batch_size: self.batch_size,
511 per_second: self.per_second,
512 producers: self.producers,
513 duration: self.duration,
514 otlp_endpoint_url: self.otlp_endpoint_url,
515 }
516 }
517
518 pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
519 Builder {
520 broker: self.broker,
521 topic: self.topic,
522 partition: self.partition,
523 schema_registry,
524 batch_size: self.batch_size,
525 per_second: self.per_second,
526 producers: self.producers,
527 duration: self.duration,
528 otlp_endpoint_url: self.otlp_endpoint_url,
529 }
530 }
531
532 pub fn batch_size(self, batch_size: u32) -> Builder<B, T, P, S> {
533 Builder {
534 broker: self.broker,
535 topic: self.topic,
536 partition: self.partition,
537 schema_registry: self.schema_registry,
538 batch_size,
539 per_second: self.per_second,
540 producers: self.producers,
541 duration: self.duration,
542 otlp_endpoint_url: self.otlp_endpoint_url,
543 }
544 }
545
546 pub fn per_second(self, per_second: Option<u32>) -> Builder<B, T, P, S> {
547 Builder {
548 broker: self.broker,
549 topic: self.topic,
550 partition: self.partition,
551 schema_registry: self.schema_registry,
552 batch_size: self.batch_size,
553 per_second,
554 producers: self.producers,
555 duration: self.duration,
556 otlp_endpoint_url: self.otlp_endpoint_url,
557 }
558 }
559
560 pub fn producers(self, producers: u32) -> Builder<B, T, P, S> {
561 Builder {
562 broker: self.broker,
563 topic: self.topic,
564 partition: self.partition,
565 schema_registry: self.schema_registry,
566 batch_size: self.batch_size,
567 per_second: self.per_second,
568 producers,
569 duration: self.duration,
570 otlp_endpoint_url: self.otlp_endpoint_url,
571 }
572 }
573
574 pub fn duration(self, duration: Option<Duration>) -> Builder<B, T, P, S> {
575 Builder {
576 broker: self.broker,
577 topic: self.topic,
578 partition: self.partition,
579 schema_registry: self.schema_registry,
580 batch_size: self.batch_size,
581 per_second: self.per_second,
582 producers: self.producers,
583 duration,
584 otlp_endpoint_url: self.otlp_endpoint_url,
585 }
586 }
587
588 pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Builder<B, T, P, S> {
589 Builder {
590 broker: self.broker,
591 topic: self.topic,
592 partition: self.partition,
593 schema_registry: self.schema_registry,
594 batch_size: self.batch_size,
595 per_second: self.per_second,
596 producers: self.producers,
597 duration: self.duration,
598 otlp_endpoint_url,
599 }
600 }
601}
602
603impl Builder<Url, String, i32, Url> {
604 pub fn build(self) -> Result<Generate> {
605 Generate::try_from(Configuration {
606 broker: self.broker,
607 topic: self.topic,
608 partition: self.partition,
609 schema_registry: self.schema_registry,
610 batch_size: self.batch_size,
611 per_second: self.per_second,
612 producers: self.producers,
613 duration: self.duration,
614 otlp_endpoint_url: self.otlp_endpoint_url,
615 })
616 }
617}