Skip to main content

drasi_lib/
lib_core.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{anyhow, Result};
16use log::{info, warn};
17use std::sync::Arc;
18use tokio::sync::RwLock;
19
20use crate::channels::*;
21use crate::config::{DrasiLibConfig, RuntimeConfig};
22use crate::inspection::InspectionAPI;
23use crate::lifecycle::LifecycleManager;
24use crate::managers::ComponentLogRegistry;
25use crate::queries::QueryManager;
26use crate::reactions::ReactionManager;
27use crate::sources::SourceManager;
28use crate::state_guard::StateGuard;
29use drasi_core::middleware::MiddlewareTypeRegistry;
30
31/// Core Drasi Server for continuous query processing
32///
33/// `DrasiLib` is the main entry point for embedding Drasi functionality in your application.
34/// It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions
35/// (output destinations) with a reactive event-driven architecture.
36///
37/// # Architecture
38///
39/// - **Sources**: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
40/// - **Queries**: Continuous Cypher or GQL queries that process data changes in real-time
41/// - **Reactions**: Output destinations that receive query results (HTTP, gRPC, Application, Log)
42///
43/// # Lifecycle States
44///
45/// The server progresses through these states:
46/// 1. **Created** (via `builder()`)
47/// 2. **Initialized** (one-time setup, automatic with builder)
48/// 3. **Running** (after `start()`)
49/// 4. **Stopped** (after `stop()`, can be restarted)
50///
51/// Components (sources, queries, reactions) have independent lifecycle states:
52/// - `Stopped`: Component exists but is not processing
53/// - `Starting`: Component is initializing
54/// - `Running`: Component is actively processing
55/// - `Stopping`: Component is shutting down
56///
57/// # Thread Safety
58///
59/// `DrasiLib` is `Clone` (all clones share the same underlying state) and all methods
60/// are thread-safe. You can safely share clones across threads and call methods concurrently.
61///
62/// # Examples
63///
64/// ## Builder Pattern
65///
66/// ```ignore
67/// use drasi_lib::{DrasiLib, Query};
68///
69/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
70/// let core = DrasiLib::builder()
71///     .with_id("my-server")
72///     .with_source(my_source)  // Pre-built source instance
73///     .with_query(
74///         Query::cypher("my-query")
75///             .query("MATCH (n:Person) RETURN n.name, n.age")
76///             .from_source("events")
77///             .auto_start(true)
78///             .build()
79///     )
80///     .with_reaction(my_reaction)  // Pre-built reaction instance
81///     .build()
82///     .await?;
83///
84/// // Start all auto-start components
85/// core.start().await?;
86///
87/// // List and inspect components
88/// let sources = core.list_sources().await?;
89/// let queries = core.list_queries().await?;
90///
91/// // Stop server
92/// core.stop().await?;
93/// # Ok(())
94/// # }
95/// ```
96///
97/// ## Dynamic Runtime Configuration
98///
99/// ```ignore
100/// use drasi_lib::{DrasiLib, Query};
101///
102/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
103/// let core = DrasiLib::builder()
104///     .with_id("dynamic-server")
105///     .build()
106///     .await?;
107///
108/// core.start().await?;
109///
110/// // Add components at runtime
111/// core.add_source(new_source_instance).await?;
112///
113/// core.add_query(
114///     Query::cypher("new-query")
115///         .query("MATCH (n) RETURN n")
116///         .from_source("new-source")
117///         .auto_start(true)
118///         .build()
119/// ).await?;
120///
121/// // Start/stop individual components
122/// core.stop_query("new-query").await?;
123/// core.start_query("new-query").await?;
124///
125/// // Remove components
126/// core.remove_query("new-query").await?;
127/// core.remove_source("new-source", false).await?;
128/// # Ok(())
129/// # }
130/// ```
131///
132/// ## Restart Behavior
133///
134/// When you call `stop()` and then `start()` again, only components with `auto_start=true`
135/// will be started. Components that were manually started (with `auto_start=false`) will
136/// remain stopped:
137///
138/// ```no_run
139/// # use drasi_lib::DrasiLib;
140/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141/// # let core = DrasiLib::builder().build().await?;
142/// core.start().await?;
143/// // ... only auto_start=true components are running ...
144///
145/// core.stop().await?;
146/// // ... all components stopped ...
147///
148/// core.start().await?;
149/// // ... only auto_start=true components restarted ...
150/// # Ok(())
151/// # }
152/// ```
153pub struct DrasiLib {
154    pub(crate) config: Arc<RuntimeConfig>,
155    pub(crate) source_manager: Arc<SourceManager>,
156    pub(crate) query_manager: Arc<QueryManager>,
157    pub(crate) reaction_manager: Arc<ReactionManager>,
158    pub(crate) running: Arc<RwLock<bool>>,
159    pub(crate) state_guard: StateGuard,
160    // Inspection API for querying server state
161    pub(crate) inspection: InspectionAPI,
162    // Lifecycle manager for orchestrating component lifecycle
163    pub(crate) lifecycle: Arc<RwLock<LifecycleManager>>,
164    // Middleware registry for source middleware
165    pub(crate) middleware_registry: Arc<MiddlewareTypeRegistry>,
166    // Component log registry for live log streaming
167    pub(crate) log_registry: Arc<ComponentLogRegistry>,
168}
169
170impl Clone for DrasiLib {
171    fn clone(&self) -> Self {
172        Self {
173            config: Arc::clone(&self.config),
174            source_manager: Arc::clone(&self.source_manager),
175            query_manager: Arc::clone(&self.query_manager),
176            reaction_manager: Arc::clone(&self.reaction_manager),
177            running: Arc::clone(&self.running),
178            state_guard: self.state_guard.clone(),
179            inspection: self.inspection.clone(),
180            lifecycle: Arc::clone(&self.lifecycle),
181            middleware_registry: Arc::clone(&self.middleware_registry),
182            log_registry: Arc::clone(&self.log_registry),
183        }
184    }
185}
186
187impl DrasiLib {
188    // ============================================================================
189    // Construction and Initialization
190    // ============================================================================
191
192    /// Create an Arc-wrapped reference to self
193    ///
194    /// Since DrasiLib contains all Arc-wrapped fields, cloning is cheap
195    /// (just increments ref counts), but this helper makes the intent clearer
196    /// and provides a single place to document this pattern.
197    pub(crate) fn as_arc(&self) -> Arc<Self> {
198        Arc::new(self.clone())
199    }
200
201    /// Internal constructor - creates uninitialized server
202    /// Use `builder()` instead
203    pub(crate) fn new(config: Arc<RuntimeConfig>) -> Self {
204        let (channels, receivers) = EventChannels::new();
205
206        // Use the shared global log registry.
207        // Since tracing uses a single global subscriber, all DrasiLib instances
208        // share the same log registry. This ensures logs are properly routed
209        // regardless of how many DrasiLib instances are created.
210        let log_registry = crate::managers::get_or_init_global_registry();
211
212        // Get the instance ID from config for log routing
213        let instance_id = config.id.clone();
214
215        let source_manager = Arc::new(SourceManager::new(
216            &instance_id,
217            channels.component_event_tx.clone(),
218            log_registry.clone(),
219        ));
220
221        // Initialize middleware registry and register all standard middleware factories
222        let mut middleware_registry = MiddlewareTypeRegistry::new();
223
224        #[cfg(feature = "middleware-jq")]
225        middleware_registry.register(Arc::new(drasi_middleware::jq::JQFactory::new()));
226
227        #[cfg(feature = "middleware-map")]
228        middleware_registry.register(Arc::new(drasi_middleware::map::MapFactory::new()));
229
230        #[cfg(feature = "middleware-unwind")]
231        middleware_registry.register(Arc::new(drasi_middleware::unwind::UnwindFactory::new()));
232
233        #[cfg(feature = "middleware-relabel")]
234        middleware_registry.register(Arc::new(
235            drasi_middleware::relabel::RelabelMiddlewareFactory::new(),
236        ));
237
238        #[cfg(feature = "middleware-decoder")]
239        middleware_registry.register(Arc::new(drasi_middleware::decoder::DecoderFactory::new()));
240
241        #[cfg(feature = "middleware-parse-json")]
242        middleware_registry.register(Arc::new(
243            drasi_middleware::parse_json::ParseJsonFactory::new(),
244        ));
245
246        #[cfg(feature = "middleware-promote")]
247        middleware_registry.register(Arc::new(
248            drasi_middleware::promote::PromoteMiddlewareFactory::new(),
249        ));
250
251        let middleware_registry = Arc::new(middleware_registry);
252
253        let query_manager = Arc::new(QueryManager::new(
254            &instance_id,
255            channels.component_event_tx.clone(),
256            source_manager.clone(),
257            config.index_factory.clone(),
258            middleware_registry.clone(),
259            log_registry.clone(),
260        ));
261
262        let reaction_manager = Arc::new(ReactionManager::new(
263            &instance_id,
264            channels.component_event_tx.clone(),
265            log_registry.clone(),
266        ));
267
268        let state_guard = StateGuard::new();
269
270        let inspection = InspectionAPI::new(
271            source_manager.clone(),
272            query_manager.clone(),
273            reaction_manager.clone(),
274            state_guard.clone(),
275            config.clone(),
276        );
277
278        let lifecycle = Arc::new(RwLock::new(LifecycleManager::new(
279            config.clone(),
280            source_manager.clone(),
281            query_manager.clone(),
282            reaction_manager.clone(),
283            Some(receivers),
284        )));
285
286        Self {
287            config,
288            source_manager,
289            query_manager,
290            reaction_manager,
291            running: Arc::new(RwLock::new(false)),
292            state_guard,
293            inspection,
294            lifecycle,
295            middleware_registry,
296            log_registry,
297        }
298    }
299
300    /// Internal initialization - performs one-time setup
301    /// This is called internally by builder and config loaders
302    pub(crate) async fn initialize(&mut self) -> Result<()> {
303        let already_initialized = self.state_guard.is_initialized().await;
304        if already_initialized {
305            info!("Server already initialized, skipping initialization");
306            return Ok(());
307        }
308
309        info!("Initializing Drasi Server Core");
310
311        // Inject QueryProvider into ReactionManager
312        // This allows reactions to access queries when they start
313        let query_provider: Arc<dyn crate::reactions::QueryProvider> = self.as_arc();
314        self.reaction_manager
315            .inject_query_provider(query_provider)
316            .await;
317
318        // Inject StateStoreProvider into SourceManager and ReactionManager
319        // This allows sources and reactions to persist state
320        let state_store = self.config.state_store_provider.clone();
321        self.source_manager
322            .inject_state_store(state_store.clone())
323            .await;
324        self.reaction_manager.inject_state_store(state_store).await;
325
326        // Load configuration
327        let lifecycle = self.lifecycle.read().await;
328        lifecycle.load_configuration().await?;
329        drop(lifecycle);
330
331        // Start event processors (one-time)
332        let mut lifecycle = self.lifecycle.write().await;
333        lifecycle.start_event_processors().await;
334        drop(lifecycle);
335
336        self.state_guard.mark_initialized().await;
337        info!("Drasi Server Core initialized successfully");
338        Ok(())
339    }
340
341    // ============================================================================
342    // Lifecycle Operations (start/stop)
343    // ============================================================================
344
345    /// Start the server and all auto-start components
346    ///
347    /// This starts all components (sources, queries, reactions) that have `auto_start` set to `true`,
348    /// as well as any components that were running when `stop()` was last called.
349    ///
350    /// Components are started in dependency order: Sources → Queries → Reactions
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if:
355    /// * The server is not initialized (`DrasiError::InvalidState`)
356    /// * The server is already running (`anyhow::Error`)
357    /// * Any component fails to start (propagated from component)
358    ///
359    /// # Examples
360    ///
361    /// ```no_run
362    /// # use drasi_lib::DrasiLib;
363    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
364    /// let core = DrasiLib::builder()
365    ///     .with_id("my-server")
366    ///     .build()
367    ///     .await?;
368    ///
369    /// // Start server and all auto-start components
370    /// core.start().await?;
371    ///
372    /// assert!(core.is_running().await);
373    /// # Ok(())
374    /// # }
375    /// ```
376    pub async fn start(&self) -> Result<()> {
377        let mut running = self.running.write().await;
378        if *running {
379            warn!("Server is already running");
380            return Err(anyhow!("Server is already running"));
381        }
382
383        info!("Starting Drasi Server Core");
384
385        // Ensure initialized
386        if !self.state_guard.is_initialized().await {
387            return Err(anyhow!("Server must be initialized before starting"));
388        }
389
390        // Start all configured components
391        let lifecycle = self.lifecycle.read().await;
392        lifecycle.start_components().await?;
393
394        *running = true;
395        info!("Drasi Server Core started successfully");
396
397        Ok(())
398    }
399
400    /// Stop the server and all running components
401    ///
402    /// This stops all currently running components (sources, queries, reactions).
403    /// Components are stopped in reverse dependency order: Reactions → Queries → Sources
404    ///
405    /// On the next `start()`, only components with `auto_start=true` will be restarted.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if:
410    /// * The server is not running (`anyhow::Error`)
411    /// * Any component fails to stop (logged as error, but doesn't prevent other components from stopping)
412    ///
413    /// # Examples
414    ///
415    /// ```no_run
416    /// # use drasi_lib::DrasiLib;
417    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
418    /// # let core = DrasiLib::builder().build().await?;
419    /// # core.start().await?;
420    /// // Stop server and all running components
421    /// core.stop().await?;
422    ///
423    /// assert!(!core.is_running().await);
424    ///
425    /// // Only auto_start=true components will be restarted
426    /// core.start().await?;
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub async fn stop(&self) -> Result<()> {
431        let mut running = self.running.write().await;
432        if !*running {
433            warn!("Server is already stopped");
434            return Err(anyhow!("Server is already stopped"));
435        }
436
437        info!("Stopping Drasi Server Core");
438
439        // Stop all components
440        let lifecycle = self.lifecycle.read().await;
441        lifecycle.stop_all_components().await?;
442
443        *running = false;
444        info!("Drasi Server Core stopped successfully");
445
446        Ok(())
447    }
448
449    // ============================================================================
450    // Handle Access (for advanced usage)
451    // ============================================================================
452
453    /// Get direct access to the query manager (advanced usage)
454    ///
455    /// This provides low-level access to the query manager for advanced scenarios.
456    /// Most users should use the higher-level methods like `get_query_info()`,
457    /// `start_query()`, etc. instead.
458    ///
459    /// # Thread Safety
460    ///
461    /// The returned reference is thread-safe and can be used across threads.
462    pub fn query_manager(&self) -> &QueryManager {
463        &self.query_manager
464    }
465
466    /// Get access to the middleware registry
467    ///
468    /// Returns a reference to the middleware type registry that contains all registered
469    /// middleware factories. The registry is pre-populated with all standard middleware
470    /// types (jq, map, unwind, relabel, decoder, parse_json, promote).
471    ///
472    /// # Thread Safety
473    ///
474    /// The returned Arc can be cloned and used across threads.
475    ///
476    /// # Examples
477    ///
478    /// ```no_run
479    /// # use drasi_lib::DrasiLib;
480    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
481    /// let core = DrasiLib::builder().build().await?;
482    /// let registry = core.middleware_registry();
483    /// // Use registry to create middleware instances
484    /// # Ok(())
485    /// # }
486    /// ```
487    pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry> {
488        Arc::clone(&self.middleware_registry)
489    }
490
491    // ============================================================================
492    // Configuration Snapshot
493    // ============================================================================
494
495    /// Get a complete configuration snapshot of all components
496    ///
497    /// Returns the full server configuration including all queries with their complete configurations.
498    /// Note: Sources and reactions are now instance-based and not stored in config.
499    /// Use `list_sources()` and `list_reactions()` for runtime information about these components.
500    ///
501    /// # Example
502    /// ```no_run
503    /// # use drasi_lib::DrasiLib;
504    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
505    /// let config = core.get_current_config().await?;
506    /// println!("Server has {} queries", config.queries.len());
507    /// # Ok(())
508    /// # }
509    /// ```
510    pub async fn get_current_config(&self) -> crate::error::Result<DrasiLibConfig> {
511        self.inspection.get_current_config().await
512    }
513
514    // ============================================================================
515    // Builder and Config File Loading
516    // ============================================================================
517
518    /// Create a builder for configuring a new DrasiLib instance.
519    ///
520    /// The builder provides a fluent API for adding queries and source/reaction instances.
521    /// Note: Sources and reactions are now instance-based. Use `with_source()` and `with_reaction()`
522    /// to add pre-built instances.
523    ///
524    /// # Example
525    /// ```no_run
526    /// use drasi_lib::{DrasiLib, Query};
527    ///
528    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
529    /// let core = DrasiLib::builder()
530    ///     .with_id("my-server")
531    ///     .with_query(
532    ///         Query::cypher("my-query")
533    ///             .query("MATCH (n) RETURN n")
534    ///             .from_source("events")
535    ///             .build()
536    ///     )
537    ///     // Use .with_source(source_instance) and .with_reaction(reaction_instance)
538    ///     // for pre-built source and reaction instances
539    ///     .build()
540    ///     .await?;
541    ///
542    /// core.start().await?;
543    /// # Ok(())
544    /// # }
545    /// ```
546    pub fn builder() -> crate::builder::DrasiLibBuilder {
547        crate::builder::DrasiLibBuilder::new()
548    }
549
550    // ============================================================================
551    // Server Status
552    // ============================================================================
553
554    /// Check if the server is currently running
555    ///
556    /// Returns `true` if `start()` has been called and the server is actively processing,
557    /// `false` if the server is stopped or has not been started yet.
558    ///
559    /// # Thread Safety
560    ///
561    /// This method is thread-safe and can be called concurrently.
562    ///
563    /// # Examples
564    ///
565    /// ```no_run
566    /// # use drasi_lib::DrasiLib;
567    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
568    /// let core = DrasiLib::builder().build().await?;
569    ///
570    /// assert!(!core.is_running().await); // Not started yet
571    ///
572    /// core.start().await?;
573    /// assert!(core.is_running().await); // Now running
574    ///
575    /// core.stop().await?;
576    /// assert!(!core.is_running().await); // Stopped again
577    /// # Ok(())
578    /// # }
579    /// ```
580    pub async fn is_running(&self) -> bool {
581        *self.running.read().await
582    }
583
584    /// Get the runtime configuration
585    ///
586    /// Returns a reference to the immutable runtime configuration containing server settings
587    /// and all component configurations. This is the configuration that was provided during
588    /// initialization (via builder, config file, or config string).
589    ///
590    /// For a current snapshot of the configuration including runtime additions, use
591    /// [`get_current_config()`](Self::get_current_config) instead.
592    ///
593    /// # Thread Safety
594    ///
595    /// This method is thread-safe and can be called concurrently.
596    ///
597    /// # Examples
598    ///
599    /// ```no_run
600    /// # use drasi_lib::DrasiLib;
601    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
602    /// let core = DrasiLib::builder()
603    ///     .with_id("my-server")
604    ///     .build()
605    ///     .await?;
606    ///
607    /// let config = core.get_config();
608    /// println!("Server ID: {}", config.id);
609    /// println!("Number of queries: {}", config.queries.len());
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub fn get_config(&self) -> &RuntimeConfig {
614        &self.config
615    }
616}
617
618// ============================================================================
619// QueryProvider Trait Implementation
620// ============================================================================
621
622// Implement QueryProvider trait for DrasiLib
623// This breaks the circular dependency by providing a minimal interface for reactions
624#[async_trait::async_trait]
625impl crate::reactions::QueryProvider for DrasiLib {
626    async fn get_query_instance(&self, id: &str) -> Result<Arc<dyn crate::queries::Query>> {
627        self.query_manager
628            .get_query_instance(id)
629            .await
630            .map_err(|e| anyhow::anyhow!(e))
631    }
632}
633
634// ============================================================================
635// Tests
636// ============================================================================
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    async fn create_test_server() -> DrasiLib {
643        DrasiLib::builder()
644            .with_id("test-server")
645            .build()
646            .await
647            .expect("Failed to build server")
648    }
649
650    #[tokio::test]
651    async fn test_middleware_registry_is_initialized() {
652        let core = create_test_server().await;
653
654        let registry = core.middleware_registry();
655
656        // Verify that middleware factories are registered when their features are enabled
657        #[cfg(feature = "middleware-jq")]
658        assert!(
659            registry.get("jq").is_some(),
660            "JQ factory should be registered"
661        );
662        #[cfg(feature = "middleware-map")]
663        assert!(
664            registry.get("map").is_some(),
665            "Map factory should be registered"
666        );
667        #[cfg(feature = "middleware-unwind")]
668        assert!(
669            registry.get("unwind").is_some(),
670            "Unwind factory should be registered"
671        );
672        #[cfg(feature = "middleware-relabel")]
673        assert!(
674            registry.get("relabel").is_some(),
675            "Relabel factory should be registered"
676        );
677        #[cfg(feature = "middleware-decoder")]
678        assert!(
679            registry.get("decoder").is_some(),
680            "Decoder factory should be registered"
681        );
682        #[cfg(feature = "middleware-parse-json")]
683        assert!(
684            registry.get("parse_json").is_some(),
685            "ParseJson factory should be registered"
686        );
687        #[cfg(feature = "middleware-promote")]
688        assert!(
689            registry.get("promote").is_some(),
690            "Promote factory should be registered"
691        );
692    }
693
694    #[tokio::test]
695    async fn test_middleware_registry_arc_sharing() {
696        let core = create_test_server().await;
697
698        let registry1 = core.middleware_registry();
699        let registry2 = core.middleware_registry();
700
701        // Both Arc instances should point to the same underlying registry
702        // We can't directly test Arc equality, but we can verify both work
703        // Test with any available middleware feature
704        #[cfg(feature = "middleware-jq")]
705        {
706            assert!(registry1.get("jq").is_some());
707            assert!(registry2.get("jq").is_some());
708        }
709        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
710        {
711            assert!(registry1.get("map").is_some());
712            assert!(registry2.get("map").is_some());
713        }
714        #[cfg(all(
715            feature = "middleware-decoder",
716            not(feature = "middleware-jq"),
717            not(feature = "middleware-map")
718        ))]
719        {
720            assert!(registry1.get("decoder").is_some());
721            assert!(registry2.get("decoder").is_some());
722        }
723    }
724
725    #[tokio::test]
726    async fn test_middleware_registry_accessible_before_start() {
727        let core = create_test_server().await;
728
729        // Should be accessible even before server is started
730        assert!(!core.is_running().await);
731        let registry = core.middleware_registry();
732
733        // Verify registry is accessible (test with any available middleware)
734        #[cfg(feature = "middleware-jq")]
735        assert!(registry.get("jq").is_some());
736        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
737        assert!(registry.get("map").is_some());
738
739        // If no middleware features are enabled, just verify the registry exists
740        #[cfg(not(any(
741            feature = "middleware-jq",
742            feature = "middleware-map",
743            feature = "middleware-decoder",
744            feature = "middleware-parse-json",
745            feature = "middleware-promote",
746            feature = "middleware-relabel",
747            feature = "middleware-unwind"
748        )))]
749        {
750            // Registry should exist even with no middleware
751            let _ = registry;
752        }
753    }
754
755    #[tokio::test]
756    async fn test_middleware_registry_accessible_after_start() {
757        let core = create_test_server().await;
758
759        core.start().await.expect("Failed to start server");
760
761        // Should be accessible after server is started
762        assert!(core.is_running().await);
763        let registry = core.middleware_registry();
764
765        // Verify registry is accessible (test with any available middleware)
766        #[cfg(feature = "middleware-jq")]
767        assert!(registry.get("jq").is_some());
768        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
769        assert!(registry.get("map").is_some());
770
771        // If no middleware features are enabled, just verify the registry exists
772        #[cfg(not(any(
773            feature = "middleware-jq",
774            feature = "middleware-map",
775            feature = "middleware-decoder",
776            feature = "middleware-parse-json",
777            feature = "middleware-promote",
778            feature = "middleware-relabel",
779            feature = "middleware-unwind"
780        )))]
781        {
782            // Registry should exist even with no middleware
783            let _ = registry;
784        }
785
786        core.stop().await.expect("Failed to stop server");
787    }
788}