lnc_client/standalone.rs
1//! Standalone Consumer Mode
2//!
3//! Provides a simplified, Kafka-like consumer API for independent consumption.
4//! In standalone mode, each consumer operates independently without coordination
5//! with other consumers. This is the simplest consumption pattern.
6//!
7//! # Example — name-based (recommended)
8//!
9//! ```rust,no_run
10//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
11//! use std::path::Path;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//! // Pass a topic *name*; the client resolves it to a numeric ID automatically.
16//! let mut consumer = StandaloneConsumer::connect(
17//! "127.0.0.1:1992",
18//! StandaloneConfig::with_topic_name("my-consumer", "my-topic")
19//! .with_offset_dir(Path::new("/var/lib/lance/offsets")),
20//! ).await?;
21//!
22//! // Receive records
23//! loop {
24//! if let Some(records) = consumer.next_batch().await? {
25//! for record in records.data.chunks(256) {
26//! // Process record
27//! }
28//! consumer.commit().await?;
29//! }
30//! }
31//! }
32//! ```
33//!
34//! # Example — ID-based (legacy / backward-compatible)
35//!
36//! ```rust,no_run
37//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
38//! use std::path::Path;
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//! let mut consumer = StandaloneConsumer::connect(
43//! "127.0.0.1:1992",
44//! StandaloneConfig::new("my-consumer", 1)
45//! .with_offset_dir(Path::new("/var/lib/lance/offsets")),
46//! ).await?;
47//!
48//! loop {
49//! if let Some(records) = consumer.next_batch().await? {
50//! for record in records.data.chunks(256) {
51//! // Process record
52//! }
53//! consumer.commit().await?;
54//! }
55//! }
56//! }
57//! ```
58
59use std::collections::hash_map::DefaultHasher;
60use std::hash::{Hash, Hasher};
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::{Duration, Instant};
64
65use crate::client::{ClientConfig, LanceClient};
66use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
67use crate::error::{Result, validate_topic_name};
68use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
69
70/// Configuration for standalone consumer mode
71///
72/// There are three ways to specify which topic to consume:
73///
74/// 1. **Name-based (recommended)** — pass only the topic name via
75/// [`StandaloneConfig::with_topic_name`]. The consumer resolves the name
76/// to a numeric ID automatically by calling `create_topic` (which is
77/// idempotent) on connect.
78///
79/// 2. **ID + name** — use [`StandaloneConfig::new_with_id`] when you already
80/// hold a resolved `TopicInfo` and want to skip the extra round-trip.
81///
82/// 3. **ID-only (legacy)** — use [`StandaloneConfig::new`] with a numeric ID
83/// directly. This preserves backward compatibility with existing callers.
84#[derive(Debug, Clone)]
85pub struct StandaloneConfig {
86 /// Identifier for this consumer (used for offset storage)
87 pub consumer_id: String,
88 /// Topic ID to consume from.
89 ///
90 /// When this is `0` and [`topic_name`](Self::topic_name) is `Some`, the consumer will
91 /// resolve the name to an ID on the first call to
92 /// [`StandaloneConsumer::connect`].
93 pub topic_id: u32,
94 /// Human-readable topic name.
95 ///
96 /// When set and `topic_id == 0`, `StandaloneConsumer::connect` resolves
97 /// the name to an ID via `create_topic` (idempotent).
98 pub topic_name: Option<String>,
99 /// Maximum bytes to fetch per poll
100 pub max_fetch_bytes: u32,
101 /// Starting position if no stored offset exists
102 pub start_position: SeekPosition,
103 /// Directory for offset persistence (None = in-memory only)
104 pub offset_dir: Option<PathBuf>,
105 /// Auto-commit interval (None = manual commit only)
106 pub auto_commit_interval: Option<Duration>,
107 /// Connection timeout
108 pub connect_timeout: Duration,
109 /// Poll timeout for blocking operations
110 pub poll_timeout: Duration,
111}
112
113impl StandaloneConfig {
114 /// Create a standalone config using a numeric topic ID.
115 ///
116 /// This is the **legacy constructor** and is kept for backward
117 /// compatibility. Prefer [`StandaloneConfig::with_topic_name`] for new
118 /// code — it lets the client resolve the name automatically and avoids
119 /// coupling call sites to numeric IDs.
120 pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
121 Self {
122 consumer_id: consumer_id.into(),
123 topic_id,
124 topic_name: None,
125 max_fetch_bytes: 1_048_576, // 1MB default
126 start_position: SeekPosition::Beginning,
127 offset_dir: None,
128 auto_commit_interval: Some(Duration::from_secs(5)),
129 connect_timeout: Duration::from_secs(30),
130 poll_timeout: Duration::from_millis(100),
131 }
132 }
133
134 /// Create a standalone config using only the topic **name**.
135 ///
136 /// When this config is passed to [`StandaloneConsumer::connect`], the
137 /// consumer calls `create_topic` (idempotent) to resolve the name to a
138 /// numeric ID before subscribing. No extra setup code is required on the
139 /// call site.
140 ///
141 /// The name must match `[a-zA-Z0-9-]+`; an error is returned at connect
142 /// time if validation fails.
143 ///
144 /// # Examples
145 ///
146 /// ```rust,no_run
147 /// use lnc_client::{StandaloneConsumer, StandaloneConfig};
148 ///
149 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
150 /// let mut consumer = StandaloneConsumer::connect(
151 /// "127.0.0.1:1992",
152 /// StandaloneConfig::with_topic_name("my-consumer", "rithmic-actions"),
153 /// ).await?;
154 /// # Ok(()) }
155 /// ```
156 pub fn with_topic_name(consumer_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
157 Self {
158 consumer_id: consumer_id.into(),
159 topic_id: 0,
160 topic_name: Some(topic_name.into()),
161 max_fetch_bytes: 1_048_576,
162 start_position: SeekPosition::Beginning,
163 offset_dir: None,
164 auto_commit_interval: Some(Duration::from_secs(5)),
165 connect_timeout: Duration::from_secs(30),
166 poll_timeout: Duration::from_millis(100),
167 }
168 }
169
170 /// Create a standalone config with both a topic name and a pre-resolved
171 /// numeric ID.
172 ///
173 /// Use this when you already have a [`TopicInfo`] from a prior
174 /// `create_topic` call and want to avoid the extra round-trip that
175 /// [`StandaloneConfig::with_topic_name`] performs on connect.
176 ///
177 /// [`TopicInfo`]: crate::TopicInfo
178 ///
179 /// # Examples
180 ///
181 /// ```rust,no_run
182 /// use lnc_client::{ClientConfig, LanceClient, StandaloneConsumer, StandaloneConfig};
183 ///
184 /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
185 /// let mut mgmt = LanceClient::connect(ClientConfig::new("127.0.0.1:1992")).await?;
186 /// let info = mgmt.create_topic("rithmic-actions").await?;
187 ///
188 /// let mut consumer = StandaloneConsumer::connect(
189 /// "127.0.0.1:1992",
190 /// StandaloneConfig::new_with_id("my-consumer", &info.name, info.id),
191 /// ).await?;
192 /// # Ok(()) }
193 /// ```
194 pub fn new_with_id(
195 consumer_id: impl Into<String>,
196 topic_name: impl Into<String>,
197 topic_id: u32,
198 ) -> Self {
199 Self {
200 consumer_id: consumer_id.into(),
201 topic_id,
202 topic_name: Some(topic_name.into()),
203 max_fetch_bytes: 1_048_576,
204 start_position: SeekPosition::Beginning,
205 offset_dir: None,
206 auto_commit_interval: Some(Duration::from_secs(5)),
207 connect_timeout: Duration::from_secs(30),
208 poll_timeout: Duration::from_millis(100),
209 }
210 }
211
212 /// Set the consumer ID for offset tracking
213 pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
214 self.consumer_id = id.into();
215 self
216 }
217
218 /// Set the maximum bytes to fetch per poll
219 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
220 self.max_fetch_bytes = bytes;
221 self
222 }
223
224 /// Set the starting position when no stored offset exists
225 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
226 self.start_position = position;
227 self
228 }
229
230 /// Set the directory for offset persistence
231 /// If not set, offsets are stored in memory only (lost on restart)
232 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
233 self.offset_dir = Some(dir.to_path_buf());
234 self
235 }
236
237 /// Set auto-commit interval (None to disable auto-commit)
238 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
239 self.auto_commit_interval = interval;
240 self
241 }
242
243 /// Disable auto-commit (manual commit only)
244 pub fn with_manual_commit(mut self) -> Self {
245 self.auto_commit_interval = None;
246 self
247 }
248
249 /// Set connection timeout
250 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
251 self.connect_timeout = timeout;
252 self
253 }
254
255 /// Set poll timeout
256 pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
257 self.poll_timeout = timeout;
258 self
259 }
260}
261
262/// Standalone consumer for independent consumption
263///
264/// This consumer operates independently without coordination with other consumers.
265/// It manages its own offset tracking and persistence.
266///
267/// # Kafka Equivalence
268///
269/// This is similar to a Kafka consumer with `enable.auto.commit=true` and no
270/// consumer group coordination. Each StandaloneConsumer instance tracks its
271/// own progress independently.
272pub struct StandaloneConsumer {
273 inner: StreamingConsumer,
274 config: StandaloneConfig,
275 offset_store: Arc<dyn OffsetStore>,
276 last_commit_time: Instant,
277 pending_offset: u64,
278 committed_offset: u64,
279}
280
281impl StandaloneConsumer {
282 /// Connect to a LANCE server and start consuming.
283 ///
284 /// This will:
285 /// 1. Validate the topic name if one is supplied (must match `[a-zA-Z0-9-]+`)
286 /// 2. Resolve the topic name to a numeric ID via `create_topic` (idempotent)
287 /// when the config was created with [`StandaloneConfig::with_topic_name`]
288 /// 3. Establish the consumer connection
289 /// 4. Load any previously committed offset from storage
290 /// 5. Subscribe to the topic starting from the stored offset (or `start_position`)
291 ///
292 /// # Errors
293 ///
294 /// Returns an error if:
295 /// - The topic name is invalid (contains characters outside `[a-zA-Z0-9-]`)
296 /// - Connection to the server fails
297 /// - Topic name resolution fails
298 /// - Offset store cannot be initialized
299 /// - Subscription fails
300 pub async fn connect(addr: &str, mut config: StandaloneConfig) -> Result<Self> {
301 let mut client_config = ClientConfig::new(addr);
302 client_config.connect_timeout = config.connect_timeout;
303
304 let mut client = LanceClient::connect(client_config).await?;
305
306 // Resolve topic name to ID when the caller used the name-based constructor.
307 if config.topic_id == 0 {
308 if let Some(ref name) = config.topic_name.clone() {
309 validate_topic_name(name)?;
310 let topic_info = client.create_topic(name).await?;
311 config.topic_id = topic_info.id;
312 }
313 }
314
315 Self::from_client(client, config).await
316 }
317
318 /// Create a standalone consumer from an existing client connection
319 pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
320 // Generate numeric consumer ID from string name
321 let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
322
323 // Initialize offset store
324 let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
325 Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
326 } else {
327 Arc::new(MemoryOffsetStore::new())
328 };
329
330 // Load stored offset
331 let stored_offset = offset_store
332 .load(config.topic_id, numeric_consumer_id)
333 .ok()
334 .flatten();
335
336 // Determine starting position
337 let start_position = if let Some(offset) = stored_offset {
338 SeekPosition::Offset(offset)
339 } else {
340 config.start_position
341 };
342
343 // Create streaming consumer config
344 let streaming_config = StreamingConsumerConfig::new(config.topic_id)
345 .with_max_batch_bytes(config.max_fetch_bytes)
346 .with_start_position(start_position)
347 .with_auto_commit_interval(0); // We handle auto-commit ourselves
348
349 let mut inner = StreamingConsumer::new(client, streaming_config);
350
351 // Start the subscription
352 inner.start().await?;
353
354 let current_offset = inner.current_offset();
355
356 Ok(Self {
357 inner,
358 config,
359 offset_store,
360 last_commit_time: Instant::now(),
361 pending_offset: current_offset,
362 committed_offset: stored_offset.unwrap_or(0),
363 })
364 }
365
366 /// Receive the next batch for an active subscription.
367 ///
368 /// Returns `Ok(Some(PollResult))` if records are available,
369 /// `Ok(None)` if no records are ready (non-blocking behavior).
370 ///
371 /// This method may trigger auto-commit if configured.
372 pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
373 // Check for auto-commit
374 self.maybe_auto_commit().await?;
375
376 // Pull from the inner streaming consumer
377 let result = self.inner.next_batch().await?;
378
379 if let Some(ref poll_result) = result {
380 self.pending_offset = poll_result.current_offset;
381 }
382
383 Ok(result)
384 }
385
386 /// Primary consume interface alias.
387 #[inline]
388 pub async fn consume(&mut self) -> Result<Option<PollResult>> {
389 self.next_batch().await
390 }
391
392 /// Compatibility wrapper for callers still using polling terminology.
393 #[inline]
394 pub async fn poll(&mut self) -> Result<Option<PollResult>> {
395 self.next_batch().await
396 }
397
398 /// Poll for records, blocking until data is available or timeout
399 ///
400 /// This is a convenience method that retries polling until records
401 /// are available or the configured poll_timeout is reached.
402 pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
403 let deadline = Instant::now() + timeout;
404
405 while Instant::now() < deadline {
406 if let Some(result) = self.next_batch().await? {
407 return Ok(Some(result));
408 }
409 // Small sleep to avoid busy-waiting
410 tokio::time::sleep(Duration::from_millis(10)).await;
411 }
412
413 Ok(None)
414 }
415
416 /// Commit the current offset
417 ///
418 /// This persists the current consumption progress to the offset store.
419 /// If the consumer crashes and restarts, it will resume from this offset.
420 pub async fn commit(&mut self) -> Result<()> {
421 self.commit_offset(self.pending_offset).await
422 }
423
424 /// Commit a specific offset
425 ///
426 /// Use this for fine-grained control over exactly which offset is committed.
427 pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
428 // Generate numeric consumer ID
429 let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
430
431 // Persist to offset store
432 self.offset_store
433 .save(self.config.topic_id, numeric_consumer_id, offset)?;
434
435 self.committed_offset = offset;
436 self.last_commit_time = Instant::now();
437
438 // Also commit to server for visibility (optional, for monitoring)
439 let _ = self.inner.commit().await;
440
441 Ok(())
442 }
443
444 /// Seek to a specific position
445 ///
446 /// This changes the consumption position. The next poll will start
447 /// from the new position.
448 pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
449 let offset = self.inner.seek(position).await?;
450 self.pending_offset = offset;
451 Ok(offset)
452 }
453
454 /// Get the current consumption offset (may not be committed yet)
455 pub fn current_offset(&self) -> u64 {
456 self.pending_offset
457 }
458
459 /// Get the last committed offset
460 pub fn committed_offset(&self) -> u64 {
461 self.committed_offset
462 }
463
464 /// Get the consumer ID
465 pub fn consumer_id(&self) -> &str {
466 &self.config.consumer_id
467 }
468
469 /// Get the topic ID being consumed
470 pub fn topic_id(&self) -> u32 {
471 self.config.topic_id
472 }
473
474 /// Check if the consumer is actively subscribed
475 pub fn is_subscribed(&self) -> bool {
476 self.inner.is_subscribed()
477 }
478
479 /// Get a reference to the underlying client
480 pub fn client(&self) -> &LanceClient {
481 self.inner.client()
482 }
483
484 /// Stop the consumer and release resources
485 ///
486 /// This will:
487 /// 1. Commit any pending offset (if auto-commit is enabled)
488 /// 2. Unsubscribe from the topic
489 /// 3. Close the connection
490 pub async fn close(mut self) -> Result<LanceClient> {
491 // Final commit if we have pending changes
492 if self.pending_offset > self.committed_offset {
493 let _ = self.commit().await;
494 }
495
496 self.inner.into_client().await
497 }
498
499 /// Hash a string consumer ID to a numeric ID
500 fn hash_consumer_id(consumer_id: &str) -> u64 {
501 let mut hasher = DefaultHasher::new();
502 consumer_id.hash(&mut hasher);
503 hasher.finish()
504 }
505
506 /// Check and perform auto-commit if interval has elapsed
507 async fn maybe_auto_commit(&mut self) -> Result<()> {
508 if let Some(interval) = self.config.auto_commit_interval {
509 if self.last_commit_time.elapsed() >= interval {
510 if self.pending_offset > self.committed_offset {
511 self.commit().await?;
512 } else {
513 // Update time even if no commit needed
514 self.last_commit_time = Instant::now();
515 }
516 }
517 }
518 Ok(())
519 }
520}
521
522impl std::fmt::Debug for StandaloneConsumer {
523 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524 f.debug_struct("StandaloneConsumer")
525 .field("consumer_id", &self.config.consumer_id)
526 .field("topic_id", &self.config.topic_id)
527 .field("pending_offset", &self.pending_offset)
528 .field("committed_offset", &self.committed_offset)
529 .field("is_subscribed", &self.inner.is_subscribed())
530 .finish()
531 }
532}
533
534/// Builder for creating multiple standalone consumers with shared configuration
535pub struct StandaloneConsumerBuilder {
536 addr: String,
537 base_config: StandaloneConfig,
538}
539
540impl StandaloneConsumerBuilder {
541 /// Create a new builder
542 pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
543 Self {
544 addr: addr.into(),
545 base_config: StandaloneConfig::new(consumer_id, 0),
546 }
547 }
548
549 /// Set the offset storage directory
550 pub fn with_offset_dir(mut self, dir: &Path) -> Self {
551 self.base_config = self.base_config.with_offset_dir(dir);
552 self
553 }
554
555 /// Set the max fetch bytes
556 pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
557 self.base_config = self.base_config.with_max_fetch_bytes(bytes);
558 self
559 }
560
561 /// Set the start position for new consumers
562 pub fn with_start_position(mut self, position: SeekPosition) -> Self {
563 self.base_config = self.base_config.with_start_position(position);
564 self
565 }
566
567 /// Set auto-commit interval
568 pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
569 self.base_config = self.base_config.with_auto_commit_interval(interval);
570 self
571 }
572
573 /// Build a consumer for a specific topic ID (legacy).
574 pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
575 let mut config = self.base_config.clone();
576 config.topic_id = topic_id;
577 StandaloneConsumer::connect(&self.addr, config).await
578 }
579
580 /// Build a consumer for a topic identified by **name**.
581 ///
582 /// The name is validated and then resolved to a numeric ID via
583 /// `create_topic` (idempotent) before the consumer connects.
584 ///
585 /// # Errors
586 ///
587 /// Returns [`ClientError::InvalidTopicName`](crate::ClientError::InvalidTopicName) if `topic_name` contains
588 /// characters outside `[a-zA-Z0-9-]` or is empty.
589 pub async fn build_for_topic_name(
590 &self,
591 topic_name: impl Into<String>,
592 ) -> Result<StandaloneConsumer> {
593 let name = topic_name.into();
594 validate_topic_name(&name)?;
595 let mut config = self.base_config.clone();
596 config.topic_id = 0;
597 config.topic_name = Some(name);
598 StandaloneConsumer::connect(&self.addr, config).await
599 }
600}
601
602#[cfg(test)]
603#[allow(clippy::unwrap_used)]
604mod tests {
605 use super::*;
606 use crate::error::validate_topic_name;
607
608 #[test]
609 fn test_standalone_config_defaults() {
610 let config = StandaloneConfig::new("test-consumer", 1);
611
612 assert_eq!(config.consumer_id, "test-consumer");
613 assert_eq!(config.topic_id, 1);
614 assert_eq!(config.max_fetch_bytes, 1_048_576);
615 assert!(config.offset_dir.is_none());
616 assert!(config.auto_commit_interval.is_some());
617 assert!(config.topic_name.is_none());
618 }
619
620 #[test]
621 fn test_standalone_config_builder() {
622 let config = StandaloneConfig::new("test", 1)
623 .with_max_fetch_bytes(512 * 1024)
624 .with_offset_dir(Path::new("/tmp/offsets"))
625 .with_manual_commit()
626 .with_start_position(SeekPosition::End);
627
628 assert_eq!(config.max_fetch_bytes, 512 * 1024);
629 assert!(config.offset_dir.is_some());
630 assert!(config.auto_commit_interval.is_none());
631 }
632
633 #[test]
634 fn test_standalone_config_with_auto_commit() {
635 let config = StandaloneConfig::new("test", 1)
636 .with_auto_commit_interval(Some(Duration::from_secs(10)));
637
638 assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
639 }
640
641 // -------------------------------------------------------------------------
642 // Name-based constructor tests
643 // -------------------------------------------------------------------------
644
645 #[test]
646 fn test_with_topic_name_sets_name_and_zero_id() {
647 let config = StandaloneConfig::with_topic_name("my-consumer", "rithmic-actions");
648
649 assert_eq!(config.consumer_id, "my-consumer");
650 assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
651 // topic_id starts at 0 — resolved on connect
652 assert_eq!(config.topic_id, 0);
653 }
654
655 #[test]
656 fn test_new_with_id_sets_both_name_and_id() {
657 let config = StandaloneConfig::new_with_id("my-consumer", "rithmic-actions", 42);
658
659 assert_eq!(config.consumer_id, "my-consumer");
660 assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
661 assert_eq!(config.topic_id, 42);
662 }
663
664 #[test]
665 fn test_with_topic_name_builder_chain() {
666 let config = StandaloneConfig::with_topic_name("consumer-1", "data-stream")
667 .with_max_fetch_bytes(256 * 1024)
668 .with_manual_commit()
669 .with_start_position(SeekPosition::End);
670
671 assert_eq!(config.topic_name.as_deref(), Some("data-stream"));
672 assert_eq!(config.topic_id, 0);
673 assert_eq!(config.max_fetch_bytes, 256 * 1024);
674 assert!(config.auto_commit_interval.is_none());
675 }
676
677 // -------------------------------------------------------------------------
678 // validate_topic_name tests
679 // -------------------------------------------------------------------------
680
681 #[test]
682 fn test_validate_topic_name_accepts_valid_names() {
683 let valid = &[
684 "rithmic-actions",
685 "data",
686 "topic-123",
687 "MyTopic",
688 "ABC",
689 "a-b-c-1-2-3",
690 ];
691 for name in valid {
692 assert!(
693 validate_topic_name(name).is_ok(),
694 "Expected {:?} to be valid",
695 name
696 );
697 }
698 }
699
700 #[test]
701 fn test_validate_topic_name_rejects_empty() {
702 let result = validate_topic_name("");
703 assert!(
704 matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
705 "Empty name should be invalid"
706 );
707 }
708
709 #[test]
710 fn test_validate_topic_name_rejects_spaces() {
711 let result = validate_topic_name("bad topic");
712 assert!(
713 matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
714 "Name with space should be invalid"
715 );
716 }
717
718 #[test]
719 fn test_validate_topic_name_rejects_special_chars() {
720 let invalid = &[
721 "topic!",
722 "topic/sub",
723 "topic.name",
724 "topic_name",
725 "topic@v2",
726 ];
727 for name in invalid {
728 assert!(
729 matches!(
730 validate_topic_name(name),
731 Err(crate::ClientError::InvalidTopicName(_))
732 ),
733 "Expected {:?} to be invalid",
734 name
735 );
736 }
737 }
738
739 #[test]
740 fn test_validate_topic_name_rejects_unicode() {
741 let result = validate_topic_name("tópico");
742 assert!(
743 matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
744 "Name with unicode should be invalid"
745 );
746 }
747}