1use std::{
20 fmt, io,
21 marker::PhantomData,
22 num::NonZeroU32,
23 pin::Pin,
24 result,
25 sync::{Arc, LazyLock, PoisonError},
26 time::{Duration, SystemTime},
27};
28
29use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
30use nonzero_ext::nonzero;
31use opentelemetry::{
32 InstrumentationScope, KeyValue, global,
33 metrics::{Counter, Histogram, Meter},
34};
35use opentelemetry_otlp::ExporterBuildError;
36use opentelemetry_sdk::error::OTelSdkError;
37use opentelemetry_semantic_conventions::SCHEMA_URL;
38use tansu_client::{Client, ConnectionManager};
39use tansu_otel::meter_provider;
40use tansu_sans_io::{
41 ErrorCode, ProduceRequest,
42 primitive::ByteSize,
43 produce_request::{PartitionProduceData, TopicProduceData},
44 record::{deflated, inflated},
45};
46use tansu_schema::{Generator as _, Registry, Schema};
47use tokio::{
48 signal::unix::{SignalKind, signal},
49 task::JoinSet,
50 time::sleep,
51};
52use tokio_util::sync::CancellationToken;
53use tracing::{Instrument, Level, debug, span};
54use url::Url;
55
56pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
57 global::meter_with_scope(
58 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
59 .with_version(env!("CARGO_PKG_VERSION"))
60 .with_schema_url(SCHEMA_URL)
61 .build(),
62 )
63});
64
65pub type Result<T, E = Error> = result::Result<T, E>;
66
67#[derive(thiserror::Error, Debug)]
68pub enum Error {
69 Api(ErrorCode),
70 Client(#[from] tansu_client::Error),
71 ExporterBuild(#[from] ExporterBuildError),
72 InsufficientCapacity(#[from] InsufficientCapacity),
73 Io(Arc<io::Error>),
74 Otel(#[from] tansu_otel::Error),
75 OtelSdk(#[from] OTelSdkError),
76 Poison,
77 Protocol(#[from] tansu_sans_io::Error),
78 Schema(Box<tansu_schema::Error>),
79 SchemaNotFoundForTopic(String),
80 UnknownHost(String),
81 Url(#[from] url::ParseError),
82}
83
84impl<T> From<PoisonError<T>> for Error {
85 fn from(_value: PoisonError<T>) -> Self {
86 Self::Poison
87 }
88}
89
90impl From<tansu_schema::Error> for Error {
91 fn from(error: tansu_schema::Error) -> Self {
92 Self::Schema(Box::new(error))
93 }
94}
95
96impl From<io::Error> for Error {
97 fn from(value: io::Error) -> Self {
98 Self::Io(Arc::new(value))
99 }
100}
101
102impl fmt::Display for Error {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 write!(f, "{self:?}")
105 }
106}
107
108#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
109pub enum CancelKind {
110 Interrupt,
111 Terminate,
112 Timeout,
113}
114
115#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
116pub struct Configuration {
117 broker: Url,
118 topic: String,
119 partition: i32,
120 schema_registry: Url,
121 batch_size: u32,
122 per_second: Option<u32>,
123 throughput: Option<u32>,
124 producers: u32,
125 duration: Option<Duration>,
126 otlp_endpoint_url: Option<Url>,
127}
128
129#[derive(Clone, Debug)]
130pub struct Generate {
131 configuration: Configuration,
132 registry: Registry,
133}
134
135impl TryFrom<Configuration> for Generate {
136 type Error = Error;
137
138 fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
139 Registry::builder_try_from_url(&configuration.schema_registry)
140 .map(|builder| builder.build())
141 .map(|registry| Self {
142 configuration,
143 registry,
144 })
145 .map_err(Into::into)
146 }
147}
148
149static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
150 METER
151 .u64_histogram("generate_produce_batch_duration")
152 .with_unit("ms")
153 .with_description("Generate a produce batch in milliseconds")
154 .build()
155});
156
157static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
158 METER
159 .u64_histogram("produce_request_response_duration")
160 .with_unit("ms")
161 .with_description("Latency of receiving an produce response in milliseconds")
162 .build()
163});
164
165fn frame(name: String, index: i32, schema: Schema, batch_size: i32) -> Result<deflated::Frame> {
166 let attributes = [
167 KeyValue::new("topic", name.clone()),
168 KeyValue::new("partition", index.to_string()),
169 KeyValue::new("batch_size", batch_size.to_string()),
170 ];
171
172 let start = SystemTime::now();
173
174 let mut batch = inflated::Batch::builder();
175 let offset_deltas = 0..batch_size;
176
177 for offset_delta in offset_deltas {
178 batch = schema
179 .generate()
180 .map(|record| record.offset_delta(offset_delta))
181 .map(|record| batch.record(record))?;
182 }
183
184 batch
185 .last_offset_delta(batch_size)
186 .build()
187 .map(|batch| inflated::Frame {
188 batches: vec![batch],
189 })
190 .and_then(deflated::Frame::try_from)
191 .inspect(|_| {
192 GENERATE_PRODUCE_BATCH_DURATION.record(
193 start
194 .elapsed()
195 .map_or(0, |duration| duration.as_millis() as u64),
196 &attributes,
197 )
198 })
199 .map_err(Into::into)
200}
201
202pub async fn produce(
203 client: Client,
204 name: String,
205 index: i32,
206 batch_size: i32,
207 frame: deflated::Frame,
208) -> Result<()> {
209 debug!(?client, %name, index, batch_size);
210
211 let attributes = [
212 KeyValue::new("topic", name.clone()),
213 KeyValue::new("partition", index.to_string()),
214 KeyValue::new("batch_size", batch_size.to_string()),
215 ];
216
217 let req = ProduceRequest::default().topic_data(Some(
218 [TopicProduceData::default().name(name).partition_data(Some(
219 [PartitionProduceData::default()
220 .index(index)
221 .records(Some(frame))]
222 .into(),
223 ))]
224 .into(),
225 ));
226
227 let start = SystemTime::now();
228
229 let response = client.call(req).await.inspect(|_| {
230 PRODUCE_REQUEST_RESPONSE_DURATION.record(
231 start
232 .elapsed()
233 .map_or(0, |duration| duration.as_millis() as u64),
234 &attributes,
235 )
236 })?;
237
238 assert!(
239 response
240 .responses
241 .unwrap_or_default()
242 .into_iter()
243 .all(|topic| {
244 topic
245 .partition_responses
246 .unwrap_or_default()
247 .iter()
248 .inspect(|partition| debug!(topic = %topic.name, ?partition))
249 .all(|partition| partition.error_code == i16::from(ErrorCode::None))
250 })
251 );
252
253 Ok(())
254}
255
256static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
257 METER
258 .u64_histogram("rate_limit_duration")
259 .with_unit("ms")
260 .with_description("Rate limit latencies in milliseconds")
261 .build()
262});
263
264static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
265 METER
266 .u64_counter("produce_record_count")
267 .with_description("Produced record count")
268 .build()
269});
270
271static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
272 METER
273 .u64_histogram("produce_duration")
274 .with_unit("ms")
275 .with_description("Produce API latencies in milliseconds")
276 .build()
277});
278
279impl Generate {
280 pub async fn main(self) -> Result<ErrorCode> {
281 debug!(configuration = ?self.configuration);
282
283 let meter_provider = self
284 .configuration
285 .otlp_endpoint_url
286 .map_or(Ok(None), |otlp_endpoint_url| {
287 meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
288 })?;
289
290 let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
291 return Err(Error::SchemaNotFoundForTopic(
292 self.configuration.topic.clone(),
293 ));
294 };
295
296 let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
297 debug!(?interrupt_signal);
298
299 let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
300 debug!(?terminate_signal);
301
302 let rate_limiter = self
303 .configuration
304 .per_second
305 .or(self.configuration.throughput)
306 .and_then(NonZeroU32::new)
307 .map(Quota::per_second)
308 .map(RateLimiter::direct)
309 .map(Arc::new)
310 .inspect(|rate_limiter| debug!(?rate_limiter));
311
312 let batch_size = NonZeroU32::new(self.configuration.batch_size)
313 .inspect(|batch_size| debug!(batch_size = batch_size.get()))
314 .unwrap_or(nonzero!(10u32));
315
316 let mut set = JoinSet::new();
317
318 let token = CancellationToken::new();
319
320 let client = ConnectionManager::builder(self.configuration.broker)
321 .client_id(Some(env!("CARGO_PKG_NAME").into()))
322 .build()
323 .await
324 .inspect(|pool| debug!(?pool))
325 .map(Client::new)?;
326
327 for producer in 0..self.configuration.producers {
328 let rate_limiter = rate_limiter.clone();
329 let schema = schema.clone();
330 let topic = self.configuration.topic.clone();
331 let partition = self.configuration.partition;
332 let token = token.clone();
333 let client = client.clone();
334
335 _ = set.spawn(async move {
336 let span = span!(Level::DEBUG, "producer", producer);
337
338 async move {
339 let attributes = [KeyValue::new("producer", producer.to_string())];
340
341 loop {
342 debug!(%topic, partition);
343
344 let Ok(frame) = frame(topic.clone(), partition, schema.clone(), batch_size.get() as i32) else {
345 break
346 };
347
348 if let Some(ref rate_limiter) = rate_limiter {
349 let rate_limit_start = SystemTime::now();
350
351
352 let cells = self.configuration.throughput.and(frame.size_in_bytes().ok().and_then(|bytes|NonZeroU32::new(bytes as u32))).unwrap_or(batch_size);
353 debug!(cells);
354
355
356 tokio::select! {
357 cancelled = token.cancelled() => {
358 debug!(?cancelled);
359 break
360 },
361
362 Ok(_) = rate_limiter.until_n_ready_with_jitter(cells, Jitter::up_to(Duration::from_millis(50))) => {
363 RATE_LIMIT_DURATION.record(
364 rate_limit_start
365 .elapsed()
366 .inspect(|duration|debug!(rate_limit_duration_ms = duration.as_millis()))
367 .map_or(0, |duration| duration.as_millis() as u64),
368 &attributes)
369
370 },
371 }
372 }
373
374 let produce_start = SystemTime::now();
375
376 tokio::select! {
377 cancelled = token.cancelled() => {
378 debug!(?cancelled);
379 break
380 },
381
382 Ok(_) = produce(client.clone(), topic.clone(), partition, batch_size.get() as i32, frame) => {
383 PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
384 PRODUCE_API_DURATION.record(produce_start.elapsed().inspect(|duration|debug!(produce_duration_ms = duration.as_millis())).map_or(0, |duration| duration.as_millis() as u64), &attributes);
385 },
386 }
387 }
388
389 }.instrument(span).await;
390
391
392 });
393 }
394
395 let join_all = async {
396 while !set.is_empty() {
397 debug!(len = set.len());
398 _ = set.join_next().await;
399 }
400 };
401
402 let duration = self
403 .configuration
404 .duration
405 .map(sleep)
406 .map(Box::pin)
407 .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
408 .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
409
410 let cancellation = tokio::select! {
411
412 timeout = duration => {
413 debug!(?timeout);
414 token.cancel();
415 Some(CancelKind::Timeout)
416 }
417
418 completed = join_all => {
419 debug!(?completed);
420 None
421 }
422
423 interrupt = interrupt_signal.recv() => {
424 debug!(?interrupt);
425 Some(CancelKind::Interrupt)
426 }
427
428 terminate = terminate_signal.recv() => {
429 debug!(?terminate);
430 Some(CancelKind::Terminate)
431 }
432
433 };
434
435 debug!(?cancellation);
436
437 if let Some(meter_provider) = meter_provider {
438 meter_provider
439 .force_flush()
440 .inspect(|force_flush| debug!(?force_flush))?;
441
442 meter_provider
443 .shutdown()
444 .inspect(|shutdown| debug!(?shutdown))?;
445 }
446
447 if let Some(CancelKind::Timeout) = cancellation {
448 sleep(Duration::from_secs(5)).await;
449 }
450
451 debug!(abort = set.len());
452 set.abort_all();
453
454 while !set.is_empty() {
455 _ = set.join_next().await;
456 }
457
458 Ok(ErrorCode::None)
459 }
460
461 pub fn builder()
462 -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
463 Builder::default()
464 }
465}
466
467#[derive(Clone, Debug)]
468pub struct Builder<B, T, P, S> {
469 broker: B,
470 topic: T,
471 partition: P,
472 schema_registry: S,
473 batch_size: u32,
474 per_second: Option<u32>,
475 throughput: Option<u32>,
476 producers: u32,
477 duration: Option<Duration>,
478 otlp_endpoint_url: Option<Url>,
479}
480
481impl Default
482 for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
483{
484 fn default() -> Self {
485 Self {
486 broker: Default::default(),
487 topic: Default::default(),
488 partition: Default::default(),
489 schema_registry: Default::default(),
490 batch_size: 1,
491 per_second: None,
492 throughput: None,
493 producers: 1,
494 duration: None,
495 otlp_endpoint_url: None,
496 }
497 }
498}
499
500impl<B, T, P, S> Builder<B, T, P, S> {
501 pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
502 Builder {
503 broker: broker.into(),
504 topic: self.topic,
505 partition: self.partition,
506 schema_registry: self.schema_registry,
507 batch_size: self.batch_size,
508 per_second: self.per_second,
509 throughput: self.throughput,
510 producers: self.producers,
511 duration: self.duration,
512 otlp_endpoint_url: self.otlp_endpoint_url,
513 }
514 }
515
516 pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
517 Builder {
518 broker: self.broker,
519 topic: topic.into(),
520 partition: self.partition,
521 schema_registry: self.schema_registry,
522 batch_size: self.batch_size,
523 per_second: self.per_second,
524 throughput: self.throughput,
525 producers: self.producers,
526 duration: self.duration,
527 otlp_endpoint_url: self.otlp_endpoint_url,
528 }
529 }
530
531 pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
532 Builder {
533 broker: self.broker,
534 topic: self.topic,
535 partition,
536 schema_registry: self.schema_registry,
537 batch_size: self.batch_size,
538 per_second: self.per_second,
539 throughput: self.throughput,
540 producers: self.producers,
541 duration: self.duration,
542 otlp_endpoint_url: self.otlp_endpoint_url,
543 }
544 }
545
546 pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
547 Builder {
548 broker: self.broker,
549 topic: self.topic,
550 partition: self.partition,
551 schema_registry,
552 batch_size: self.batch_size,
553 per_second: self.per_second,
554 throughput: self.throughput,
555 producers: self.producers,
556 duration: self.duration,
557 otlp_endpoint_url: self.otlp_endpoint_url,
558 }
559 }
560
561 pub fn batch_size(self, batch_size: u32) -> Self {
562 Self { batch_size, ..self }
563 }
564
565 pub fn per_second(self, per_second: Option<u32>) -> Self {
566 Self { per_second, ..self }
567 }
568
569 pub fn throughput(self, throughput: Option<u32>) -> Self {
570 Self { throughput, ..self }
571 }
572
573 pub fn producers(self, producers: u32) -> Self {
574 Self { producers, ..self }
575 }
576
577 pub fn duration(self, duration: Option<Duration>) -> Self {
578 Self { duration, ..self }
579 }
580
581 pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Self {
582 Self {
583 otlp_endpoint_url,
584 ..self
585 }
586 }
587}
588
589impl Builder<Url, String, i32, Url> {
590 pub fn build(self) -> Result<Generate> {
591 Generate::try_from(Configuration {
592 broker: self.broker,
593 topic: self.topic,
594 partition: self.partition,
595 schema_registry: self.schema_registry,
596 batch_size: self.batch_size,
597 per_second: self.per_second,
598 throughput: self.throughput,
599 producers: self.producers,
600 duration: self.duration,
601 otlp_endpoint_url: self.otlp_endpoint_url,
602 })
603 }
604}