Skip to main content

lnc_client/
standalone.rs

1//! Standalone Consumer Mode
2//!
3//! Provides a simplified, Kafka-like consumer API for independent consumption.
4//! In standalone mode, each consumer operates independently without coordination
5//! with other consumers. This is the simplest consumption pattern.
6//!
7//! # Example — name-based (recommended)
8//!
9//! ```rust,no_run
10//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
11//! use std::path::Path;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     // Pass a topic *name*; the client resolves it to a numeric ID automatically.
16//!     let mut consumer = StandaloneConsumer::connect(
17//!         "127.0.0.1:1992",
18//!         StandaloneConfig::with_topic_name("my-consumer", "my-topic")
19//!             .with_offset_dir(Path::new("/var/lib/lance/offsets")),
20//!     ).await?;
21//!
22//!     // Receive records
23//!     loop {
24//!         if let Some(records) = consumer.next_batch().await? {
25//!             for record in records.data.chunks(256) {
26//!                 // Process record
27//!             }
28//!             consumer.commit().await?;
29//!         }
30//!     }
31//! }
32//! ```
33//!
34//! # Example — ID-based (legacy / backward-compatible)
35//!
36//! ```rust,no_run
37//! use lnc_client::{StandaloneConsumer, StandaloneConfig};
38//! use std::path::Path;
39//!
40//! #[tokio::main]
41//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
42//!     let mut consumer = StandaloneConsumer::connect(
43//!         "127.0.0.1:1992",
44//!         StandaloneConfig::new("my-consumer", 1)
45//!             .with_offset_dir(Path::new("/var/lib/lance/offsets")),
46//!     ).await?;
47//!
48//!     loop {
49//!         if let Some(records) = consumer.next_batch().await? {
50//!             for record in records.data.chunks(256) {
51//!                 // Process record
52//!             }
53//!             consumer.commit().await?;
54//!         }
55//!     }
56//! }
57//! ```
58
59use std::collections::hash_map::DefaultHasher;
60use std::hash::{Hash, Hasher};
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::{Duration, Instant};
64
65use crate::client::{ClientConfig, LanceClient};
66use crate::consumer::{PollResult, SeekPosition, StreamingConsumer, StreamingConsumerConfig};
67use crate::error::{Result, validate_topic_name};
68use crate::offset::{LockFileOffsetStore, MemoryOffsetStore, OffsetStore};
69
70/// Configuration for standalone consumer mode
71///
72/// There are three ways to specify which topic to consume:
73///
74/// 1. **Name-based (recommended)** — pass only the topic name via
75///    [`StandaloneConfig::with_topic_name`].  The consumer resolves the name
76///    to a numeric ID automatically by calling `create_topic` (which is
77///    idempotent) on connect.
78///
79/// 2. **ID + name** — use [`StandaloneConfig::new_with_id`] when you already
80///    hold a resolved `TopicInfo` and want to skip the extra round-trip.
81///
82/// 3. **ID-only (legacy)** — use [`StandaloneConfig::new`] with a numeric ID
83///    directly.  This preserves backward compatibility with existing callers.
84#[derive(Debug, Clone)]
85pub struct StandaloneConfig {
86    /// Identifier for this consumer (used for offset storage)
87    pub consumer_id: String,
88    /// Topic ID to consume from.
89    ///
90    /// When this is `0` and [`topic_name`](Self::topic_name) is `Some`, the consumer will
91    /// resolve the name to an ID on the first call to
92    /// [`StandaloneConsumer::connect`].
93    pub topic_id: u32,
94    /// Human-readable topic name.
95    ///
96    /// When set and `topic_id == 0`, `StandaloneConsumer::connect` resolves
97    /// the name to an ID via `create_topic` (idempotent).
98    pub topic_name: Option<String>,
99    /// Maximum bytes to fetch per poll
100    pub max_fetch_bytes: u32,
101    /// Starting position if no stored offset exists
102    pub start_position: SeekPosition,
103    /// Directory for offset persistence (None = in-memory only)
104    pub offset_dir: Option<PathBuf>,
105    /// Auto-commit interval (None = manual commit only)
106    pub auto_commit_interval: Option<Duration>,
107    /// Connection timeout
108    pub connect_timeout: Duration,
109    /// Poll timeout for blocking operations
110    pub poll_timeout: Duration,
111}
112
113impl StandaloneConfig {
114    /// Create a standalone config using a numeric topic ID.
115    ///
116    /// This is the **legacy constructor** and is kept for backward
117    /// compatibility. Prefer [`StandaloneConfig::with_topic_name`] for new
118    /// code — it lets the client resolve the name automatically and avoids
119    /// coupling call sites to numeric IDs.
120    pub fn new(consumer_id: impl Into<String>, topic_id: u32) -> Self {
121        Self {
122            consumer_id: consumer_id.into(),
123            topic_id,
124            topic_name: None,
125            max_fetch_bytes: 1_048_576, // 1MB default
126            start_position: SeekPosition::Beginning,
127            offset_dir: None,
128            auto_commit_interval: Some(Duration::from_secs(5)),
129            connect_timeout: Duration::from_secs(30),
130            poll_timeout: Duration::from_millis(100),
131        }
132    }
133
134    /// Create a standalone config using only the topic **name**.
135    ///
136    /// When this config is passed to [`StandaloneConsumer::connect`], the
137    /// consumer calls `create_topic` (idempotent) to resolve the name to a
138    /// numeric ID before subscribing.  No extra setup code is required on the
139    /// call site.
140    ///
141    /// The name must match `[a-zA-Z0-9-]+`; an error is returned at connect
142    /// time if validation fails.
143    ///
144    /// # Examples
145    ///
146    /// ```rust,no_run
147    /// use lnc_client::{StandaloneConsumer, StandaloneConfig};
148    ///
149    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
150    /// let mut consumer = StandaloneConsumer::connect(
151    ///     "127.0.0.1:1992",
152    ///     StandaloneConfig::with_topic_name("my-consumer", "rithmic-actions"),
153    /// ).await?;
154    /// # Ok(()) }
155    /// ```
156    pub fn with_topic_name(consumer_id: impl Into<String>, topic_name: impl Into<String>) -> Self {
157        Self {
158            consumer_id: consumer_id.into(),
159            topic_id: 0,
160            topic_name: Some(topic_name.into()),
161            max_fetch_bytes: 1_048_576,
162            start_position: SeekPosition::Beginning,
163            offset_dir: None,
164            auto_commit_interval: Some(Duration::from_secs(5)),
165            connect_timeout: Duration::from_secs(30),
166            poll_timeout: Duration::from_millis(100),
167        }
168    }
169
170    /// Create a standalone config with both a topic name and a pre-resolved
171    /// numeric ID.
172    ///
173    /// Use this when you already have a [`TopicInfo`] from a prior
174    /// `create_topic` call and want to avoid the extra round-trip that
175    /// [`StandaloneConfig::with_topic_name`] performs on connect.
176    ///
177    /// [`TopicInfo`]: crate::TopicInfo
178    ///
179    /// # Examples
180    ///
181    /// ```rust,no_run
182    /// use lnc_client::{ClientConfig, LanceClient, StandaloneConsumer, StandaloneConfig};
183    ///
184    /// # #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> {
185    /// let mut mgmt = LanceClient::connect(ClientConfig::new("127.0.0.1:1992")).await?;
186    /// let info = mgmt.create_topic("rithmic-actions").await?;
187    ///
188    /// let mut consumer = StandaloneConsumer::connect(
189    ///     "127.0.0.1:1992",
190    ///     StandaloneConfig::new_with_id("my-consumer", &info.name, info.id),
191    /// ).await?;
192    /// # Ok(()) }
193    /// ```
194    pub fn new_with_id(
195        consumer_id: impl Into<String>,
196        topic_name: impl Into<String>,
197        topic_id: u32,
198    ) -> Self {
199        Self {
200            consumer_id: consumer_id.into(),
201            topic_id,
202            topic_name: Some(topic_name.into()),
203            max_fetch_bytes: 1_048_576,
204            start_position: SeekPosition::Beginning,
205            offset_dir: None,
206            auto_commit_interval: Some(Duration::from_secs(5)),
207            connect_timeout: Duration::from_secs(30),
208            poll_timeout: Duration::from_millis(100),
209        }
210    }
211
212    /// Set the consumer ID for offset tracking
213    pub fn with_consumer_id(mut self, id: impl Into<String>) -> Self {
214        self.consumer_id = id.into();
215        self
216    }
217
218    /// Set the maximum bytes to fetch per poll
219    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
220        self.max_fetch_bytes = bytes;
221        self
222    }
223
224    /// Set the starting position when no stored offset exists
225    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
226        self.start_position = position;
227        self
228    }
229
230    /// Set the directory for offset persistence
231    /// If not set, offsets are stored in memory only (lost on restart)
232    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
233        self.offset_dir = Some(dir.to_path_buf());
234        self
235    }
236
237    /// Set auto-commit interval (None to disable auto-commit)
238    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
239        self.auto_commit_interval = interval;
240        self
241    }
242
243    /// Disable auto-commit (manual commit only)
244    pub fn with_manual_commit(mut self) -> Self {
245        self.auto_commit_interval = None;
246        self
247    }
248
249    /// Set connection timeout
250    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
251        self.connect_timeout = timeout;
252        self
253    }
254
255    /// Set poll timeout
256    pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
257        self.poll_timeout = timeout;
258        self
259    }
260}
261
262/// Standalone consumer for independent consumption
263///
264/// This consumer operates independently without coordination with other consumers.
265/// It manages its own offset tracking and persistence.
266///
267/// # Kafka Equivalence
268///
269/// This is similar to a Kafka consumer with `enable.auto.commit=true` and no
270/// consumer group coordination. Each StandaloneConsumer instance tracks its
271/// own progress independently.
272pub struct StandaloneConsumer {
273    inner: StreamingConsumer,
274    config: StandaloneConfig,
275    offset_store: Arc<dyn OffsetStore>,
276    last_commit_time: Instant,
277    pending_offset: u64,
278    committed_offset: u64,
279}
280
281impl StandaloneConsumer {
282    /// Connect to a LANCE server and start consuming.
283    ///
284    /// This will:
285    /// 1. Validate the topic name if one is supplied (must match `[a-zA-Z0-9-]+`)
286    /// 2. Resolve the topic name to a numeric ID via `create_topic` (idempotent)
287    ///    when the config was created with [`StandaloneConfig::with_topic_name`]
288    /// 3. Establish the consumer connection
289    /// 4. Load any previously committed offset from storage
290    /// 5. Subscribe to the topic starting from the stored offset (or `start_position`)
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if:
295    /// - The topic name is invalid (contains characters outside `[a-zA-Z0-9-]`)
296    /// - Connection to the server fails
297    /// - Topic name resolution fails
298    /// - Offset store cannot be initialized
299    /// - Subscription fails
300    pub async fn connect(addr: &str, mut config: StandaloneConfig) -> Result<Self> {
301        let mut client_config = ClientConfig::new(addr);
302        client_config.connect_timeout = config.connect_timeout;
303
304        let mut client = LanceClient::connect(client_config).await?;
305
306        // Resolve topic name to ID when the caller used the name-based constructor.
307        if config.topic_id == 0 {
308            if let Some(ref name) = config.topic_name.clone() {
309                validate_topic_name(name)?;
310                let topic_info = client.create_topic(name).await?;
311                config.topic_id = topic_info.id;
312            }
313        }
314
315        Self::from_client(client, config).await
316    }
317
318    /// Create a standalone consumer from an existing client connection
319    pub async fn from_client(client: LanceClient, config: StandaloneConfig) -> Result<Self> {
320        // Generate numeric consumer ID from string name
321        let numeric_consumer_id = Self::hash_consumer_id(&config.consumer_id);
322
323        // Initialize offset store
324        let offset_store: Arc<dyn OffsetStore> = if let Some(ref dir) = config.offset_dir {
325            Arc::new(LockFileOffsetStore::open(dir, &config.consumer_id)?)
326        } else {
327            Arc::new(MemoryOffsetStore::new())
328        };
329
330        // Load stored offset
331        let stored_offset = offset_store
332            .load(config.topic_id, numeric_consumer_id)
333            .ok()
334            .flatten();
335
336        // Determine starting position
337        let start_position = if let Some(offset) = stored_offset {
338            SeekPosition::Offset(offset)
339        } else {
340            config.start_position
341        };
342
343        // Create streaming consumer config
344        let streaming_config = StreamingConsumerConfig::new(config.topic_id)
345            .with_max_batch_bytes(config.max_fetch_bytes)
346            .with_start_position(start_position)
347            .with_auto_commit_interval(0); // We handle auto-commit ourselves
348
349        let mut inner = StreamingConsumer::new(client, streaming_config);
350
351        // Start the subscription
352        inner.start().await?;
353
354        let current_offset = inner.current_offset();
355
356        Ok(Self {
357            inner,
358            config,
359            offset_store,
360            last_commit_time: Instant::now(),
361            pending_offset: current_offset,
362            committed_offset: stored_offset.unwrap_or(0),
363        })
364    }
365
366    /// Receive the next batch for an active subscription.
367    ///
368    /// Returns `Ok(Some(PollResult))` if records are available,
369    /// `Ok(None)` if no records are ready (non-blocking behavior).
370    ///
371    /// This method may trigger auto-commit if configured.
372    pub async fn next_batch(&mut self) -> Result<Option<PollResult>> {
373        // Check for auto-commit
374        self.maybe_auto_commit().await?;
375
376        // Pull from the inner streaming consumer
377        let result = self.inner.next_batch().await?;
378
379        if let Some(ref poll_result) = result {
380            self.pending_offset = poll_result.current_offset;
381        }
382
383        Ok(result)
384    }
385
386    /// Primary consume interface alias.
387    #[inline]
388    pub async fn consume(&mut self) -> Result<Option<PollResult>> {
389        self.next_batch().await
390    }
391
392    /// Compatibility wrapper for callers still using polling terminology.
393    #[inline]
394    pub async fn poll(&mut self) -> Result<Option<PollResult>> {
395        self.next_batch().await
396    }
397
398    /// Poll for records, blocking until data is available or timeout
399    ///
400    /// This is a convenience method that retries polling until records
401    /// are available or the configured poll_timeout is reached.
402    pub async fn poll_timeout(&mut self, timeout: Duration) -> Result<Option<PollResult>> {
403        let deadline = Instant::now() + timeout;
404
405        while Instant::now() < deadline {
406            if let Some(result) = self.next_batch().await? {
407                return Ok(Some(result));
408            }
409            // Small sleep to avoid busy-waiting
410            tokio::time::sleep(Duration::from_millis(10)).await;
411        }
412
413        Ok(None)
414    }
415
416    /// Commit the current offset
417    ///
418    /// This persists the current consumption progress to the offset store.
419    /// If the consumer crashes and restarts, it will resume from this offset.
420    pub async fn commit(&mut self) -> Result<()> {
421        self.commit_offset(self.pending_offset).await
422    }
423
424    /// Commit a specific offset
425    ///
426    /// Use this for fine-grained control over exactly which offset is committed.
427    pub async fn commit_offset(&mut self, offset: u64) -> Result<()> {
428        // Generate numeric consumer ID
429        let numeric_consumer_id = Self::hash_consumer_id(&self.config.consumer_id);
430
431        // Persist to offset store
432        self.offset_store
433            .save(self.config.topic_id, numeric_consumer_id, offset)?;
434
435        self.committed_offset = offset;
436        self.last_commit_time = Instant::now();
437
438        // Also commit to server for visibility (optional, for monitoring)
439        let _ = self.inner.commit().await;
440
441        Ok(())
442    }
443
444    /// Seek to a specific position
445    ///
446    /// This changes the consumption position. The next poll will start
447    /// from the new position.
448    pub async fn seek(&mut self, position: SeekPosition) -> Result<u64> {
449        let offset = self.inner.seek(position).await?;
450        self.pending_offset = offset;
451        Ok(offset)
452    }
453
454    /// Get the current consumption offset (may not be committed yet)
455    pub fn current_offset(&self) -> u64 {
456        self.pending_offset
457    }
458
459    /// Get the last committed offset
460    pub fn committed_offset(&self) -> u64 {
461        self.committed_offset
462    }
463
464    /// Get the consumer ID
465    pub fn consumer_id(&self) -> &str {
466        &self.config.consumer_id
467    }
468
469    /// Get the topic ID being consumed
470    pub fn topic_id(&self) -> u32 {
471        self.config.topic_id
472    }
473
474    /// Check if the consumer is actively subscribed
475    pub fn is_subscribed(&self) -> bool {
476        self.inner.is_subscribed()
477    }
478
479    /// Get a reference to the underlying client
480    pub fn client(&self) -> &LanceClient {
481        self.inner.client()
482    }
483
484    /// Stop the consumer and release resources
485    ///
486    /// This will:
487    /// 1. Commit any pending offset (if auto-commit is enabled)
488    /// 2. Unsubscribe from the topic
489    /// 3. Close the connection
490    pub async fn close(mut self) -> Result<LanceClient> {
491        // Final commit if we have pending changes
492        if self.pending_offset > self.committed_offset {
493            let _ = self.commit().await;
494        }
495
496        self.inner.into_client().await
497    }
498
499    /// Hash a string consumer ID to a numeric ID
500    fn hash_consumer_id(consumer_id: &str) -> u64 {
501        let mut hasher = DefaultHasher::new();
502        consumer_id.hash(&mut hasher);
503        hasher.finish()
504    }
505
506    /// Check and perform auto-commit if interval has elapsed
507    async fn maybe_auto_commit(&mut self) -> Result<()> {
508        if let Some(interval) = self.config.auto_commit_interval {
509            if self.last_commit_time.elapsed() >= interval {
510                if self.pending_offset > self.committed_offset {
511                    self.commit().await?;
512                } else {
513                    // Update time even if no commit needed
514                    self.last_commit_time = Instant::now();
515                }
516            }
517        }
518        Ok(())
519    }
520}
521
522impl std::fmt::Debug for StandaloneConsumer {
523    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
524        f.debug_struct("StandaloneConsumer")
525            .field("consumer_id", &self.config.consumer_id)
526            .field("topic_id", &self.config.topic_id)
527            .field("pending_offset", &self.pending_offset)
528            .field("committed_offset", &self.committed_offset)
529            .field("is_subscribed", &self.inner.is_subscribed())
530            .finish()
531    }
532}
533
534/// Builder for creating multiple standalone consumers with shared configuration
535pub struct StandaloneConsumerBuilder {
536    addr: String,
537    base_config: StandaloneConfig,
538}
539
540impl StandaloneConsumerBuilder {
541    /// Create a new builder
542    pub fn new(addr: impl Into<String>, consumer_id: impl Into<String>) -> Self {
543        Self {
544            addr: addr.into(),
545            base_config: StandaloneConfig::new(consumer_id, 0),
546        }
547    }
548
549    /// Set the offset storage directory
550    pub fn with_offset_dir(mut self, dir: &Path) -> Self {
551        self.base_config = self.base_config.with_offset_dir(dir);
552        self
553    }
554
555    /// Set the max fetch bytes
556    pub fn with_max_fetch_bytes(mut self, bytes: u32) -> Self {
557        self.base_config = self.base_config.with_max_fetch_bytes(bytes);
558        self
559    }
560
561    /// Set the start position for new consumers
562    pub fn with_start_position(mut self, position: SeekPosition) -> Self {
563        self.base_config = self.base_config.with_start_position(position);
564        self
565    }
566
567    /// Set auto-commit interval
568    pub fn with_auto_commit_interval(mut self, interval: Option<Duration>) -> Self {
569        self.base_config = self.base_config.with_auto_commit_interval(interval);
570        self
571    }
572
573    /// Build a consumer for a specific topic ID (legacy).
574    pub async fn build_for_topic(&self, topic_id: u32) -> Result<StandaloneConsumer> {
575        let mut config = self.base_config.clone();
576        config.topic_id = topic_id;
577        StandaloneConsumer::connect(&self.addr, config).await
578    }
579
580    /// Build a consumer for a topic identified by **name**.
581    ///
582    /// The name is validated and then resolved to a numeric ID via
583    /// `create_topic` (idempotent) before the consumer connects.
584    ///
585    /// # Errors
586    ///
587    /// Returns [`ClientError::InvalidTopicName`](crate::ClientError::InvalidTopicName) if `topic_name` contains
588    /// characters outside `[a-zA-Z0-9-]` or is empty.
589    pub async fn build_for_topic_name(
590        &self,
591        topic_name: impl Into<String>,
592    ) -> Result<StandaloneConsumer> {
593        let name = topic_name.into();
594        validate_topic_name(&name)?;
595        let mut config = self.base_config.clone();
596        config.topic_id = 0;
597        config.topic_name = Some(name);
598        StandaloneConsumer::connect(&self.addr, config).await
599    }
600}
601
602#[cfg(test)]
603#[allow(clippy::unwrap_used)]
604mod tests {
605    use super::*;
606    use crate::error::validate_topic_name;
607
608    #[test]
609    fn test_standalone_config_defaults() {
610        let config = StandaloneConfig::new("test-consumer", 1);
611
612        assert_eq!(config.consumer_id, "test-consumer");
613        assert_eq!(config.topic_id, 1);
614        assert_eq!(config.max_fetch_bytes, 1_048_576);
615        assert!(config.offset_dir.is_none());
616        assert!(config.auto_commit_interval.is_some());
617        assert!(config.topic_name.is_none());
618    }
619
620    #[test]
621    fn test_standalone_config_builder() {
622        let config = StandaloneConfig::new("test", 1)
623            .with_max_fetch_bytes(512 * 1024)
624            .with_offset_dir(Path::new("/tmp/offsets"))
625            .with_manual_commit()
626            .with_start_position(SeekPosition::End);
627
628        assert_eq!(config.max_fetch_bytes, 512 * 1024);
629        assert!(config.offset_dir.is_some());
630        assert!(config.auto_commit_interval.is_none());
631    }
632
633    #[test]
634    fn test_standalone_config_with_auto_commit() {
635        let config = StandaloneConfig::new("test", 1)
636            .with_auto_commit_interval(Some(Duration::from_secs(10)));
637
638        assert_eq!(config.auto_commit_interval, Some(Duration::from_secs(10)));
639    }
640
641    // -------------------------------------------------------------------------
642    // Name-based constructor tests
643    // -------------------------------------------------------------------------
644
645    #[test]
646    fn test_with_topic_name_sets_name_and_zero_id() {
647        let config = StandaloneConfig::with_topic_name("my-consumer", "rithmic-actions");
648
649        assert_eq!(config.consumer_id, "my-consumer");
650        assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
651        // topic_id starts at 0 — resolved on connect
652        assert_eq!(config.topic_id, 0);
653    }
654
655    #[test]
656    fn test_new_with_id_sets_both_name_and_id() {
657        let config = StandaloneConfig::new_with_id("my-consumer", "rithmic-actions", 42);
658
659        assert_eq!(config.consumer_id, "my-consumer");
660        assert_eq!(config.topic_name.as_deref(), Some("rithmic-actions"));
661        assert_eq!(config.topic_id, 42);
662    }
663
664    #[test]
665    fn test_with_topic_name_builder_chain() {
666        let config = StandaloneConfig::with_topic_name("consumer-1", "data-stream")
667            .with_max_fetch_bytes(256 * 1024)
668            .with_manual_commit()
669            .with_start_position(SeekPosition::End);
670
671        assert_eq!(config.topic_name.as_deref(), Some("data-stream"));
672        assert_eq!(config.topic_id, 0);
673        assert_eq!(config.max_fetch_bytes, 256 * 1024);
674        assert!(config.auto_commit_interval.is_none());
675    }
676
677    // -------------------------------------------------------------------------
678    // validate_topic_name tests
679    // -------------------------------------------------------------------------
680
681    #[test]
682    fn test_validate_topic_name_accepts_valid_names() {
683        let valid = &[
684            "rithmic-actions",
685            "data",
686            "topic-123",
687            "MyTopic",
688            "ABC",
689            "a-b-c-1-2-3",
690        ];
691        for name in valid {
692            assert!(
693                validate_topic_name(name).is_ok(),
694                "Expected {:?} to be valid",
695                name
696            );
697        }
698    }
699
700    #[test]
701    fn test_validate_topic_name_rejects_empty() {
702        let result = validate_topic_name("");
703        assert!(
704            matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
705            "Empty name should be invalid"
706        );
707    }
708
709    #[test]
710    fn test_validate_topic_name_rejects_spaces() {
711        let result = validate_topic_name("bad topic");
712        assert!(
713            matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
714            "Name with space should be invalid"
715        );
716    }
717
718    #[test]
719    fn test_validate_topic_name_rejects_special_chars() {
720        let invalid = &[
721            "topic!",
722            "topic/sub",
723            "topic.name",
724            "topic_name",
725            "topic@v2",
726        ];
727        for name in invalid {
728            assert!(
729                matches!(
730                    validate_topic_name(name),
731                    Err(crate::ClientError::InvalidTopicName(_))
732                ),
733                "Expected {:?} to be invalid",
734                name
735            );
736        }
737    }
738
739    #[test]
740    fn test_validate_topic_name_rejects_unicode() {
741        let result = validate_topic_name("tópico");
742        assert!(
743            matches!(result, Err(crate::ClientError::InvalidTopicName(_))),
744            "Name with unicode should be invalid"
745        );
746    }
747}