1use std::{
20 fmt, io,
21 marker::PhantomData,
22 net::SocketAddr,
23 num::NonZeroU32,
24 pin::Pin,
25 result,
26 sync::{Arc, LazyLock, PoisonError},
27 time::{Duration, SystemTime},
28};
29
30use governor::{InsufficientCapacity, Jitter, Quota, RateLimiter};
31use nonzero_ext::nonzero;
32use opentelemetry::{
33 InstrumentationScope, KeyValue, global,
34 metrics::{Counter, Histogram, Meter},
35};
36use opentelemetry_otlp::ExporterBuildError;
37use opentelemetry_sdk::error::OTelSdkError;
38use opentelemetry_semantic_conventions::SCHEMA_URL;
39use rama::{
40 Context, Service,
41 error::{BoxError, OpaqueError},
42};
43use tansu_otel::meter_provider;
44use tansu_sans_io::{
45 ErrorCode,
46 produce_request::{PartitionProduceData, TopicProduceData},
47 record::{deflated, inflated},
48};
49use tansu_schema::{Generator as _, Registry, Schema};
50use tansu_service::{
51 api::produce::{ProduceRequest, ProduceResponse},
52 service::ApiClient,
53};
54use tokio::{
55 net::lookup_host,
56 signal::unix::{SignalKind, signal},
57 task::JoinSet,
58 time::sleep,
59};
60use tokio_util::sync::CancellationToken;
61use tracing::{Instrument, Level, debug, span};
62use url::Url;
63
64pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
65 global::meter_with_scope(
66 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
67 .with_version(env!("CARGO_PKG_VERSION"))
68 .with_schema_url(SCHEMA_URL)
69 .build(),
70 )
71});
72
73pub type Result<T, E = Error> = result::Result<T, E>;
74
75#[derive(thiserror::Error, Debug)]
76pub enum Error {
77 Api(ErrorCode),
78 Box(#[from] BoxError),
79 ExporterBuild(#[from] ExporterBuildError),
80 InsufficientCapacity(#[from] InsufficientCapacity),
81 Io(Arc<io::Error>),
82 Opaque(#[from] OpaqueError),
83 Otel(#[from] tansu_otel::Error),
84 OtelSdk(#[from] OTelSdkError),
85 Poison,
86 Protocol(#[from] tansu_sans_io::Error),
87 Schema(Box<tansu_schema::Error>),
88 SchemaNotFoundForTopic(String),
89 UnknownHost(String),
90 Url(#[from] url::ParseError),
91}
92
93impl<T> From<PoisonError<T>> for Error {
94 fn from(_value: PoisonError<T>) -> Self {
95 Self::Poison
96 }
97}
98
99impl From<tansu_schema::Error> for Error {
100 fn from(error: tansu_schema::Error) -> Self {
101 Self::Schema(Box::new(error))
102 }
103}
104
105impl From<io::Error> for Error {
106 fn from(value: io::Error) -> Self {
107 Self::Io(Arc::new(value))
108 }
109}
110
111impl fmt::Display for Error {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 write!(f, "{self:?}")
114 }
115}
116
117#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
118pub enum CancelKind {
119 Interrupt,
120 Terminate,
121 Timeout,
122}
123
124#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
125pub struct Configuration {
126 broker: Url,
127 topic: String,
128 partition: i32,
129 schema_registry: Url,
130 batch_size: u32,
131 per_second: Option<u32>,
132 producers: u32,
133 duration: Option<Duration>,
134 otlp_endpoint_url: Option<Url>,
135}
136
137#[derive(Clone, Debug)]
138pub struct Generate {
139 configuration: Configuration,
140 registry: Registry,
141}
142
143impl TryFrom<Configuration> for Generate {
144 type Error = Error;
145
146 fn try_from(configuration: Configuration) -> Result<Self, Self::Error> {
147 Registry::try_from(&configuration.schema_registry)
148 .map(|registry| Self {
149 configuration,
150 registry,
151 })
152 .map_err(Into::into)
153 }
154}
155
156static DNS_LOOKUP_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
157 METER
158 .u64_histogram("dns_lookup_duration")
159 .with_unit("ms")
160 .with_description("DNS lookup latencies")
161 .build()
162});
163
164async fn host_port(url: &Url) -> Result<SocketAddr> {
165 if let Some(host) = url.host_str()
166 && let Some(port) = url.port()
167 {
168 let attributes = [KeyValue::new("url", url.to_string())];
169 let start = SystemTime::now();
170
171 let mut addresses = lookup_host(format!("{host}:{port}"))
172 .await
173 .inspect(|_| {
174 DNS_LOOKUP_DURATION.record(
175 start
176 .elapsed()
177 .map_or(0, |duration| duration.as_millis() as u64),
178 &attributes,
179 )
180 })?
181 .filter(|socket_addr| matches!(socket_addr, SocketAddr::V4(_)));
182
183 if let Some(socket_addr) = addresses.next().inspect(|socket_addr| debug!(?socket_addr)) {
184 return Ok(socket_addr);
185 }
186 }
187
188 Err(Error::UnknownHost(url.to_string()))
189}
190
191static GENERATE_PRODUCE_BATCH_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
192 METER
193 .u64_histogram("generate_produce_batch_duration")
194 .with_unit("ms")
195 .with_description("Generate a produce batch in milliseconds")
196 .build()
197});
198
199static PRODUCE_REQUEST_RESPONSE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
200 METER
201 .u64_histogram("produce_request_response_duration")
202 .with_unit("ms")
203 .with_description("Latency of receiving an produce response in milliseconds")
204 .build()
205});
206
207pub async fn produce(
208 broker: Url,
209 name: String,
210 index: i32,
211 schema: Schema,
212 batch_size: i32,
213) -> Result<()> {
214 debug!(%broker, %name, index, batch_size);
215
216 let attributes = [
217 KeyValue::new("topic", name.clone()),
218 KeyValue::new("partition", index.to_string()),
219 KeyValue::new("batch_size", batch_size.to_string()),
220 ];
221
222 let frame = {
223 let start = SystemTime::now();
224
225 let mut batch = inflated::Batch::builder();
226 let offset_deltas = 0..batch_size;
227
228 for offset_delta in offset_deltas {
229 batch = schema
230 .generate()
231 .map(|record| record.offset_delta(offset_delta))
232 .map(|record| batch.record(record))?;
233 }
234
235 batch
236 .last_offset_delta(batch_size)
237 .build()
238 .map(|batch| inflated::Frame {
239 batches: vec![batch],
240 })
241 .and_then(deflated::Frame::try_from)
242 .inspect(|_| {
243 GENERATE_PRODUCE_BATCH_DURATION.record(
244 start
245 .elapsed()
246 .map_or(0, |duration| duration.as_millis() as u64),
247 &attributes,
248 )
249 })?
250 };
251
252 let request = ProduceRequest {
253 topic_data: Some(
254 [TopicProduceData::default().name(name).partition_data(Some(
255 [PartitionProduceData::default()
256 .index(index)
257 .records(Some(frame))]
258 .into(),
259 ))]
260 .into(),
261 ),
262 ..Default::default()
263 };
264
265 let origin = host_port(&broker)
266 .await
267 .map(Into::into)
268 .map(ApiClient::new)
269 .inspect(|client| debug!(?client))?;
270
271 let response = {
272 let start = SystemTime::now();
273
274 let attributes = [];
275
276 origin
277 .serve(Context::default(), request.into())
278 .await
279 .and_then(|response| ProduceResponse::try_from(response).map_err(Into::into))
280 .inspect(|response| debug!(?response))
281 .inspect(|_| {
282 PRODUCE_REQUEST_RESPONSE_DURATION.record(
283 start
284 .elapsed()
285 .map_or(0, |duration| duration.as_millis() as u64),
286 &attributes,
287 )
288 })
289 }?;
290
291 assert!(
292 response
293 .responses
294 .unwrap_or_default()
295 .into_iter()
296 .all(|topic| {
297 topic
298 .partition_responses
299 .unwrap_or_default()
300 .iter()
301 .inspect(|partition| debug!(topic = %topic.name, ?partition))
302 .all(|partition| partition.error_code == i16::from(ErrorCode::None))
303 })
304 );
305
306 Ok(())
307}
308
309static RATE_LIMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
310 METER
311 .u64_histogram("rate_limit_duration")
312 .with_unit("ms")
313 .with_description("Rate limit latencies in milliseconds")
314 .build()
315});
316
317static PRODUCE_RECORD_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
318 METER
319 .u64_counter("produce_record_count")
320 .with_description("Produced record count")
321 .build()
322});
323
324static PRODUCE_API_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
325 METER
326 .u64_histogram("produce_duration")
327 .with_unit("ms")
328 .with_description("Produce API latencies in milliseconds")
329 .build()
330});
331
332impl Generate {
333 pub async fn main(self) -> Result<ErrorCode> {
334 let meter_provider = self
335 .configuration
336 .otlp_endpoint_url
337 .map_or(Ok(None), |otlp_endpoint_url| {
338 meter_provider(otlp_endpoint_url, env!("CARGO_PKG_NAME")).map(Some)
339 })?;
340
341 let Some(schema) = self.registry.schema(&self.configuration.topic).await? else {
342 return Err(Error::SchemaNotFoundForTopic(
343 self.configuration.topic.clone(),
344 ));
345 };
346
347 let mut interrupt_signal = signal(SignalKind::interrupt()).unwrap();
348 debug!(?interrupt_signal);
349
350 let mut terminate_signal = signal(SignalKind::terminate()).unwrap();
351 debug!(?terminate_signal);
352
353 let rate_limiter = self
354 .configuration
355 .per_second
356 .and_then(NonZeroU32::new)
357 .map(Quota::per_second)
358 .map(RateLimiter::direct)
359 .map(Arc::new)
360 .inspect(|rate_limiter| debug!(?rate_limiter));
361
362 let batch_size = NonZeroU32::new(self.configuration.batch_size)
363 .inspect(|batch_size| debug!(batch_size = batch_size.get()))
364 .unwrap_or(nonzero!(10u32));
365
366 let mut set = JoinSet::new();
367
368 let token = CancellationToken::new();
369
370 for producer in 0..self.configuration.producers {
371 let rate_limiter = rate_limiter.clone();
372 let schema = schema.clone();
373 let broker = self.configuration.broker.clone();
374 let topic = self.configuration.topic.clone();
375 let partition = self.configuration.partition;
376 let token = token.clone();
377
378 _ = set.spawn(async move {
379 let span = span!(Level::DEBUG, "producer", producer);
380
381
382 async move {
383 let attributes = [KeyValue::new("producer", producer.to_string())];
384
385 loop {
386 debug!(%broker, %topic, partition);
387
388 if let Some(ref rate_limiter) = rate_limiter {
389 let rate_limit_start = SystemTime::now();
390
391
392
393
394
395 tokio::select! {
396 cancelled = token.cancelled() => {
397 debug!(?cancelled);
398 break
399 },
400
401 Ok(_) = rate_limiter.until_n_ready_with_jitter(batch_size, Jitter::up_to(Duration::from_millis(50))) => {
402 RATE_LIMIT_DURATION.record(
403 rate_limit_start
404 .elapsed()
405 .map_or(0, |duration| duration.as_millis() as u64),
406 &attributes)
407
408 },
409 }
410 }
411
412 let produce_start = SystemTime::now();
413
414 tokio::select! {
415 cancelled = token.cancelled() => {
416 debug!(?cancelled);
417 break
418 },
419
420 Ok(_) = produce(broker.clone(), topic.clone(), partition, schema.clone(), batch_size.get() as i32) => {
421 PRODUCE_RECORD_COUNT.add(batch_size.get() as u64, &attributes);
422 PRODUCE_API_DURATION.record(produce_start.elapsed().map_or(0, |duration| duration.as_millis() as u64), &attributes);
423 },
424 }
425 }
426
427 }.instrument(span).await
428
429 });
430 }
431
432 let join_all = async {
433 while !set.is_empty() {
434 debug!(len = set.len());
435 _ = set.join_next().await;
436 }
437 };
438
439 let duration = self
440 .configuration
441 .duration
442 .map(sleep)
443 .map(Box::pin)
444 .map(|pinned| pinned as Pin<Box<dyn Future<Output = ()>>>)
445 .unwrap_or(Box::pin(std::future::pending()) as Pin<Box<dyn Future<Output = ()>>>);
446
447 let cancellation = tokio::select! {
448
449 timeout = duration => {
450 debug!(?timeout);
451 token.cancel();
452 Some(CancelKind::Timeout)
453 }
454
455 completed = join_all => {
456 debug!(?completed);
457 None
458 }
459
460 interrupt = interrupt_signal.recv() => {
461 debug!(?interrupt);
462 Some(CancelKind::Interrupt)
463 }
464
465 terminate = terminate_signal.recv() => {
466 debug!(?terminate);
467 Some(CancelKind::Terminate)
468 }
469
470 };
471
472 debug!(?cancellation);
473
474 if let Some(meter_provider) = meter_provider {
475 meter_provider
476 .force_flush()
477 .inspect(|force_flush| debug!(?force_flush))?;
478
479 meter_provider
480 .shutdown()
481 .inspect(|shutdown| debug!(?shutdown))?;
482 }
483
484 if let Some(CancelKind::Timeout) = cancellation {
485 sleep(Duration::from_secs(5)).await;
486 }
487
488 debug!(abort = set.len());
489 set.abort_all();
490
491 while !set.is_empty() {
492 _ = set.join_next().await;
493 }
494
495 Ok(ErrorCode::None)
496 }
497
498 pub fn builder()
499 -> Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>> {
500 Builder::default()
501 }
502}
503
504#[derive(Clone, Debug)]
505pub struct Builder<B, T, P, S> {
506 broker: B,
507 topic: T,
508 partition: P,
509 schema_registry: S,
510 batch_size: u32,
511 per_second: Option<u32>,
512 producers: u32,
513 duration: Option<Duration>,
514 otlp_endpoint_url: Option<Url>,
515}
516
517impl Default
518 for Builder<PhantomData<Url>, PhantomData<String>, PhantomData<i32>, PhantomData<Url>>
519{
520 fn default() -> Self {
521 Self {
522 broker: Default::default(),
523 topic: Default::default(),
524 partition: Default::default(),
525 schema_registry: Default::default(),
526 batch_size: 1,
527 per_second: None,
528 producers: 1,
529 duration: None,
530 otlp_endpoint_url: None,
531 }
532 }
533}
534
535impl<B, T, P, S> Builder<B, T, P, S> {
536 pub fn broker(self, broker: impl Into<Url>) -> Builder<Url, T, P, S> {
537 Builder {
538 broker: broker.into(),
539 topic: self.topic,
540 partition: self.partition,
541 schema_registry: self.schema_registry,
542 batch_size: self.batch_size,
543 per_second: self.per_second,
544 producers: self.producers,
545 duration: self.duration,
546 otlp_endpoint_url: self.otlp_endpoint_url,
547 }
548 }
549
550 pub fn topic(self, topic: impl Into<String>) -> Builder<B, String, P, S> {
551 Builder {
552 broker: self.broker,
553 topic: topic.into(),
554 partition: self.partition,
555 schema_registry: self.schema_registry,
556 batch_size: self.batch_size,
557 per_second: self.per_second,
558 producers: self.producers,
559 duration: self.duration,
560 otlp_endpoint_url: self.otlp_endpoint_url,
561 }
562 }
563
564 pub fn partition(self, partition: i32) -> Builder<B, T, i32, S> {
565 Builder {
566 broker: self.broker,
567 topic: self.topic,
568 partition,
569 schema_registry: self.schema_registry,
570 batch_size: self.batch_size,
571 per_second: self.per_second,
572 producers: self.producers,
573 duration: self.duration,
574 otlp_endpoint_url: self.otlp_endpoint_url,
575 }
576 }
577
578 pub fn schema_registry(self, schema_registry: Url) -> Builder<B, T, P, Url> {
579 Builder {
580 broker: self.broker,
581 topic: self.topic,
582 partition: self.partition,
583 schema_registry,
584 batch_size: self.batch_size,
585 per_second: self.per_second,
586 producers: self.producers,
587 duration: self.duration,
588 otlp_endpoint_url: self.otlp_endpoint_url,
589 }
590 }
591
592 pub fn batch_size(self, batch_size: u32) -> Builder<B, T, P, S> {
593 Builder {
594 broker: self.broker,
595 topic: self.topic,
596 partition: self.partition,
597 schema_registry: self.schema_registry,
598 batch_size,
599 per_second: self.per_second,
600 producers: self.producers,
601 duration: self.duration,
602 otlp_endpoint_url: self.otlp_endpoint_url,
603 }
604 }
605
606 pub fn per_second(self, per_second: Option<u32>) -> Builder<B, T, P, S> {
607 Builder {
608 broker: self.broker,
609 topic: self.topic,
610 partition: self.partition,
611 schema_registry: self.schema_registry,
612 batch_size: self.batch_size,
613 per_second,
614 producers: self.producers,
615 duration: self.duration,
616 otlp_endpoint_url: self.otlp_endpoint_url,
617 }
618 }
619
620 pub fn producers(self, producers: u32) -> Builder<B, T, P, S> {
621 Builder {
622 broker: self.broker,
623 topic: self.topic,
624 partition: self.partition,
625 schema_registry: self.schema_registry,
626 batch_size: self.batch_size,
627 per_second: self.per_second,
628 producers,
629 duration: self.duration,
630 otlp_endpoint_url: self.otlp_endpoint_url,
631 }
632 }
633
634 pub fn duration(self, duration: Option<Duration>) -> Builder<B, T, P, S> {
635 Builder {
636 broker: self.broker,
637 topic: self.topic,
638 partition: self.partition,
639 schema_registry: self.schema_registry,
640 batch_size: self.batch_size,
641 per_second: self.per_second,
642 producers: self.producers,
643 duration,
644 otlp_endpoint_url: self.otlp_endpoint_url,
645 }
646 }
647
648 pub fn otlp_endpoint_url(self, otlp_endpoint_url: Option<Url>) -> Builder<B, T, P, S> {
649 Builder {
650 broker: self.broker,
651 topic: self.topic,
652 partition: self.partition,
653 schema_registry: self.schema_registry,
654 batch_size: self.batch_size,
655 per_second: self.per_second,
656 producers: self.producers,
657 duration: self.duration,
658 otlp_endpoint_url,
659 }
660 }
661}
662
663impl Builder<Url, String, i32, Url> {
664 pub fn build(self) -> Result<Generate> {
665 Generate::try_from(Configuration {
666 broker: self.broker,
667 topic: self.topic,
668 partition: self.partition,
669 schema_registry: self.schema_registry,
670 batch_size: self.batch_size,
671 per_second: self.per_second,
672 producers: self.producers,
673 duration: self.duration,
674 otlp_endpoint_url: self.otlp_endpoint_url,
675 })
676 }
677}