Skip to main content

drasi_lib/
lib_core.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{anyhow, Result};
16use log::{info, warn};
17use std::sync::Arc;
18use tokio::sync::RwLock;
19
20use crate::channels::*;
21use crate::config::{DrasiLibConfig, RuntimeConfig};
22use crate::inspection::InspectionAPI;
23use crate::lifecycle::LifecycleManager;
24use crate::queries::QueryManager;
25use crate::reactions::ReactionManager;
26use crate::sources::SourceManager;
27use crate::state_guard::StateGuard;
28use drasi_core::middleware::MiddlewareTypeRegistry;
29
30/// Core Drasi Server for continuous query processing
31///
32/// `DrasiLib` is the main entry point for embedding Drasi functionality in your application.
33/// It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions
34/// (output destinations) with a reactive event-driven architecture.
35///
36/// # Architecture
37///
38/// - **Sources**: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
39/// - **Queries**: Continuous Cypher or GQL queries that process data changes in real-time
40/// - **Reactions**: Output destinations that receive query results (HTTP, gRPC, Application, Log)
41///
42/// # Lifecycle States
43///
44/// The server progresses through these states:
45/// 1. **Created** (via `builder()`)
46/// 2. **Initialized** (one-time setup, automatic with builder)
47/// 3. **Running** (after `start()`)
48/// 4. **Stopped** (after `stop()`, can be restarted)
49///
50/// Components (sources, queries, reactions) have independent lifecycle states:
51/// - `Stopped`: Component exists but is not processing
52/// - `Starting`: Component is initializing
53/// - `Running`: Component is actively processing
54/// - `Stopping`: Component is shutting down
55///
56/// # Thread Safety
57///
58/// `DrasiLib` is `Clone` (all clones share the same underlying state) and all methods
59/// are thread-safe. You can safely share clones across threads and call methods concurrently.
60///
61/// # Examples
62///
63/// ## Builder Pattern
64///
65/// ```ignore
66/// use drasi_lib::{DrasiLib, Query};
67///
68/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
69/// let core = DrasiLib::builder()
70///     .with_id("my-server")
71///     .with_source(my_source)  // Pre-built source instance
72///     .with_query(
73///         Query::cypher("my-query")
74///             .query("MATCH (n:Person) RETURN n.name, n.age")
75///             .from_source("events")
76///             .auto_start(true)
77///             .build()
78///     )
79///     .with_reaction(my_reaction)  // Pre-built reaction instance
80///     .build()
81///     .await?;
82///
83/// // Start all auto-start components
84/// core.start().await?;
85///
86/// // List and inspect components
87/// let sources = core.list_sources().await?;
88/// let queries = core.list_queries().await?;
89///
90/// // Stop server
91/// core.stop().await?;
92/// # Ok(())
93/// # }
94/// ```
95///
96/// ## Dynamic Runtime Configuration
97///
98/// ```ignore
99/// use drasi_lib::{DrasiLib, Query};
100///
101/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
102/// let core = DrasiLib::builder()
103///     .with_id("dynamic-server")
104///     .build()
105///     .await?;
106///
107/// core.start().await?;
108///
109/// // Add components at runtime
110/// core.add_source(new_source_instance).await?;
111///
112/// core.add_query(
113///     Query::cypher("new-query")
114///         .query("MATCH (n) RETURN n")
115///         .from_source("new-source")
116///         .auto_start(true)
117///         .build()
118/// ).await?;
119///
120/// // Start/stop individual components
121/// core.stop_query("new-query").await?;
122/// core.start_query("new-query").await?;
123///
124/// // Remove components
125/// core.remove_query("new-query").await?;
126/// core.remove_source("new-source").await?;
127/// # Ok(())
128/// # }
129/// ```
130///
131/// ## Restart Behavior
132///
133/// When you call `stop()` and then `start()` again, only components with `auto_start=true`
134/// will be started. Components that were manually started (with `auto_start=false`) will
135/// remain stopped:
136///
137/// ```no_run
138/// # use drasi_lib::DrasiLib;
139/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
140/// # let core = DrasiLib::builder().build().await?;
141/// core.start().await?;
142/// // ... only auto_start=true components are running ...
143///
144/// core.stop().await?;
145/// // ... all components stopped ...
146///
147/// core.start().await?;
148/// // ... only auto_start=true components restarted ...
149/// # Ok(())
150/// # }
151/// ```
152pub struct DrasiLib {
153    pub(crate) config: Arc<RuntimeConfig>,
154    pub(crate) source_manager: Arc<SourceManager>,
155    pub(crate) query_manager: Arc<QueryManager>,
156    pub(crate) reaction_manager: Arc<ReactionManager>,
157    pub(crate) running: Arc<RwLock<bool>>,
158    pub(crate) state_guard: StateGuard,
159    // Inspection API for querying server state
160    pub(crate) inspection: InspectionAPI,
161    // Lifecycle manager for orchestrating component lifecycle
162    pub(crate) lifecycle: Arc<RwLock<LifecycleManager>>,
163    // Middleware registry for source middleware
164    pub(crate) middleware_registry: Arc<MiddlewareTypeRegistry>,
165}
166
167impl Clone for DrasiLib {
168    fn clone(&self) -> Self {
169        Self {
170            config: Arc::clone(&self.config),
171            source_manager: Arc::clone(&self.source_manager),
172            query_manager: Arc::clone(&self.query_manager),
173            reaction_manager: Arc::clone(&self.reaction_manager),
174            running: Arc::clone(&self.running),
175            state_guard: self.state_guard.clone(),
176            inspection: self.inspection.clone(),
177            lifecycle: Arc::clone(&self.lifecycle),
178            middleware_registry: Arc::clone(&self.middleware_registry),
179        }
180    }
181}
182
183impl DrasiLib {
184    // ============================================================================
185    // Construction and Initialization
186    // ============================================================================
187
188    /// Create an Arc-wrapped reference to self
189    ///
190    /// Since DrasiLib contains all Arc-wrapped fields, cloning is cheap
191    /// (just increments ref counts), but this helper makes the intent clearer
192    /// and provides a single place to document this pattern.
193    pub(crate) fn as_arc(&self) -> Arc<Self> {
194        Arc::new(self.clone())
195    }
196
197    /// Internal constructor - creates uninitialized server
198    /// Use `builder()` instead
199    pub(crate) fn new(config: Arc<RuntimeConfig>) -> Self {
200        let (channels, receivers) = EventChannels::new();
201
202        let source_manager = Arc::new(SourceManager::new(channels.component_event_tx.clone()));
203
204        // Initialize middleware registry and register all standard middleware factories
205        let mut middleware_registry = MiddlewareTypeRegistry::new();
206
207        #[cfg(feature = "middleware-jq")]
208        middleware_registry.register(Arc::new(drasi_middleware::jq::JQFactory::new()));
209
210        #[cfg(feature = "middleware-map")]
211        middleware_registry.register(Arc::new(drasi_middleware::map::MapFactory::new()));
212
213        #[cfg(feature = "middleware-unwind")]
214        middleware_registry.register(Arc::new(drasi_middleware::unwind::UnwindFactory::new()));
215
216        #[cfg(feature = "middleware-relabel")]
217        middleware_registry.register(Arc::new(
218            drasi_middleware::relabel::RelabelMiddlewareFactory::new(),
219        ));
220
221        #[cfg(feature = "middleware-decoder")]
222        middleware_registry.register(Arc::new(drasi_middleware::decoder::DecoderFactory::new()));
223
224        #[cfg(feature = "middleware-parse-json")]
225        middleware_registry.register(Arc::new(
226            drasi_middleware::parse_json::ParseJsonFactory::new(),
227        ));
228
229        #[cfg(feature = "middleware-promote")]
230        middleware_registry.register(Arc::new(
231            drasi_middleware::promote::PromoteMiddlewareFactory::new(),
232        ));
233
234        let middleware_registry = Arc::new(middleware_registry);
235
236        let query_manager = Arc::new(QueryManager::new(
237            channels.component_event_tx.clone(),
238            source_manager.clone(),
239            config.index_factory.clone(),
240            middleware_registry.clone(),
241        ));
242
243        let reaction_manager = Arc::new(ReactionManager::new(channels.component_event_tx.clone()));
244
245        let state_guard = StateGuard::new();
246
247        let inspection = InspectionAPI::new(
248            source_manager.clone(),
249            query_manager.clone(),
250            reaction_manager.clone(),
251            state_guard.clone(),
252            config.clone(),
253        );
254
255        let lifecycle = Arc::new(RwLock::new(LifecycleManager::new(
256            config.clone(),
257            source_manager.clone(),
258            query_manager.clone(),
259            reaction_manager.clone(),
260            Some(receivers),
261        )));
262
263        Self {
264            config,
265            source_manager,
266            query_manager,
267            reaction_manager,
268            running: Arc::new(RwLock::new(false)),
269            state_guard,
270            inspection,
271            lifecycle,
272            middleware_registry,
273        }
274    }
275
276    /// Internal initialization - performs one-time setup
277    /// This is called internally by builder and config loaders
278    pub(crate) async fn initialize(&mut self) -> Result<()> {
279        let already_initialized = self.state_guard.is_initialized().await;
280        if already_initialized {
281            info!("Server already initialized, skipping initialization");
282            return Ok(());
283        }
284
285        info!("Initializing Drasi Server Core");
286
287        // Inject QueryProvider into ReactionManager
288        // This allows reactions to access queries when they start
289        let query_provider: Arc<dyn crate::reactions::QueryProvider> = self.as_arc();
290        self.reaction_manager
291            .inject_query_provider(query_provider)
292            .await;
293
294        // Inject StateStoreProvider into SourceManager and ReactionManager
295        // This allows sources and reactions to persist state
296        let state_store = self.config.state_store_provider.clone();
297        self.source_manager
298            .inject_state_store(state_store.clone())
299            .await;
300        self.reaction_manager.inject_state_store(state_store).await;
301
302        // Load configuration
303        let lifecycle = self.lifecycle.read().await;
304        lifecycle.load_configuration().await?;
305        drop(lifecycle);
306
307        // Start event processors (one-time)
308        let mut lifecycle = self.lifecycle.write().await;
309        lifecycle.start_event_processors().await;
310        drop(lifecycle);
311
312        self.state_guard.mark_initialized().await;
313        info!("Drasi Server Core initialized successfully");
314        Ok(())
315    }
316
317    // ============================================================================
318    // Lifecycle Operations (start/stop)
319    // ============================================================================
320
321    /// Start the server and all auto-start components
322    ///
323    /// This starts all components (sources, queries, reactions) that have `auto_start` set to `true`,
324    /// as well as any components that were running when `stop()` was last called.
325    ///
326    /// Components are started in dependency order: Sources → Queries → Reactions
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if:
331    /// * The server is not initialized (`DrasiError::InvalidState`)
332    /// * The server is already running (`anyhow::Error`)
333    /// * Any component fails to start (propagated from component)
334    ///
335    /// # Examples
336    ///
337    /// ```no_run
338    /// # use drasi_lib::DrasiLib;
339    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
340    /// let core = DrasiLib::builder()
341    ///     .with_id("my-server")
342    ///     .build()
343    ///     .await?;
344    ///
345    /// // Start server and all auto-start components
346    /// core.start().await?;
347    ///
348    /// assert!(core.is_running().await);
349    /// # Ok(())
350    /// # }
351    /// ```
352    pub async fn start(&self) -> Result<()> {
353        let mut running = self.running.write().await;
354        if *running {
355            warn!("Server is already running");
356            return Err(anyhow!("Server is already running"));
357        }
358
359        info!("Starting Drasi Server Core");
360
361        // Ensure initialized
362        if !self.state_guard.is_initialized().await {
363            return Err(anyhow!("Server must be initialized before starting"));
364        }
365
366        // Start all configured components
367        let lifecycle = self.lifecycle.read().await;
368        lifecycle.start_components().await?;
369
370        *running = true;
371        info!("Drasi Server Core started successfully");
372
373        Ok(())
374    }
375
376    /// Stop the server and all running components
377    ///
378    /// This stops all currently running components (sources, queries, reactions).
379    /// Components are stopped in reverse dependency order: Reactions → Queries → Sources
380    ///
381    /// On the next `start()`, only components with `auto_start=true` will be restarted.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if:
386    /// * The server is not running (`anyhow::Error`)
387    /// * Any component fails to stop (logged as error, but doesn't prevent other components from stopping)
388    ///
389    /// # Examples
390    ///
391    /// ```no_run
392    /// # use drasi_lib::DrasiLib;
393    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
394    /// # let core = DrasiLib::builder().build().await?;
395    /// # core.start().await?;
396    /// // Stop server and all running components
397    /// core.stop().await?;
398    ///
399    /// assert!(!core.is_running().await);
400    ///
401    /// // Only auto_start=true components will be restarted
402    /// core.start().await?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    pub async fn stop(&self) -> Result<()> {
407        let mut running = self.running.write().await;
408        if !*running {
409            warn!("Server is already stopped");
410            return Err(anyhow!("Server is already stopped"));
411        }
412
413        info!("Stopping Drasi Server Core");
414
415        // Stop all components
416        let lifecycle = self.lifecycle.read().await;
417        lifecycle.stop_all_components().await?;
418
419        *running = false;
420        info!("Drasi Server Core stopped successfully");
421
422        Ok(())
423    }
424
425    // ============================================================================
426    // Handle Access (for advanced usage)
427    // ============================================================================
428
429    /// Get direct access to the query manager (advanced usage)
430    ///
431    /// This provides low-level access to the query manager for advanced scenarios.
432    /// Most users should use the higher-level methods like `get_query_info()`,
433    /// `start_query()`, etc. instead.
434    ///
435    /// # Thread Safety
436    ///
437    /// The returned reference is thread-safe and can be used across threads.
438    pub fn query_manager(&self) -> &QueryManager {
439        &self.query_manager
440    }
441
442    /// Get access to the middleware registry
443    ///
444    /// Returns a reference to the middleware type registry that contains all registered
445    /// middleware factories. The registry is pre-populated with all standard middleware
446    /// types (jq, map, unwind, relabel, decoder, parse_json, promote).
447    ///
448    /// # Thread Safety
449    ///
450    /// The returned Arc can be cloned and used across threads.
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// # use drasi_lib::DrasiLib;
456    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
457    /// let core = DrasiLib::builder().build().await?;
458    /// let registry = core.middleware_registry();
459    /// // Use registry to create middleware instances
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry> {
464        Arc::clone(&self.middleware_registry)
465    }
466
467    // ============================================================================
468    // Configuration Snapshot
469    // ============================================================================
470
471    /// Get a complete configuration snapshot of all components
472    ///
473    /// Returns the full server configuration including all queries with their complete configurations.
474    /// Note: Sources and reactions are now instance-based and not stored in config.
475    /// Use `list_sources()` and `list_reactions()` for runtime information about these components.
476    ///
477    /// # Example
478    /// ```no_run
479    /// # use drasi_lib::DrasiLib;
480    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
481    /// let config = core.get_current_config().await?;
482    /// println!("Server has {} queries", config.queries.len());
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub async fn get_current_config(&self) -> crate::error::Result<DrasiLibConfig> {
487        self.inspection.get_current_config().await
488    }
489
490    // ============================================================================
491    // Builder and Config File Loading
492    // ============================================================================
493
494    /// Create a builder for configuring a new DrasiLib instance.
495    ///
496    /// The builder provides a fluent API for adding queries and source/reaction instances.
497    /// Note: Sources and reactions are now instance-based. Use `with_source()` and `with_reaction()`
498    /// to add pre-built instances.
499    ///
500    /// # Example
501    /// ```no_run
502    /// use drasi_lib::{DrasiLib, Query};
503    ///
504    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
505    /// let core = DrasiLib::builder()
506    ///     .with_id("my-server")
507    ///     .with_query(
508    ///         Query::cypher("my-query")
509    ///             .query("MATCH (n) RETURN n")
510    ///             .from_source("events")
511    ///             .build()
512    ///     )
513    ///     // Use .with_source(source_instance) and .with_reaction(reaction_instance)
514    ///     // for pre-built source and reaction instances
515    ///     .build()
516    ///     .await?;
517    ///
518    /// core.start().await?;
519    /// # Ok(())
520    /// # }
521    /// ```
522    pub fn builder() -> crate::builder::DrasiLibBuilder {
523        crate::builder::DrasiLibBuilder::new()
524    }
525
526    // ============================================================================
527    // Server Status
528    // ============================================================================
529
530    /// Check if the server is currently running
531    ///
532    /// Returns `true` if `start()` has been called and the server is actively processing,
533    /// `false` if the server is stopped or has not been started yet.
534    ///
535    /// # Thread Safety
536    ///
537    /// This method is thread-safe and can be called concurrently.
538    ///
539    /// # Examples
540    ///
541    /// ```no_run
542    /// # use drasi_lib::DrasiLib;
543    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
544    /// let core = DrasiLib::builder().build().await?;
545    ///
546    /// assert!(!core.is_running().await); // Not started yet
547    ///
548    /// core.start().await?;
549    /// assert!(core.is_running().await); // Now running
550    ///
551    /// core.stop().await?;
552    /// assert!(!core.is_running().await); // Stopped again
553    /// # Ok(())
554    /// # }
555    /// ```
556    pub async fn is_running(&self) -> bool {
557        *self.running.read().await
558    }
559
560    /// Get the runtime configuration
561    ///
562    /// Returns a reference to the immutable runtime configuration containing server settings
563    /// and all component configurations. This is the configuration that was provided during
564    /// initialization (via builder, config file, or config string).
565    ///
566    /// For a current snapshot of the configuration including runtime additions, use
567    /// [`get_current_config()`](Self::get_current_config) instead.
568    ///
569    /// # Thread Safety
570    ///
571    /// This method is thread-safe and can be called concurrently.
572    ///
573    /// # Examples
574    ///
575    /// ```no_run
576    /// # use drasi_lib::DrasiLib;
577    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
578    /// let core = DrasiLib::builder()
579    ///     .with_id("my-server")
580    ///     .build()
581    ///     .await?;
582    ///
583    /// let config = core.get_config();
584    /// println!("Server ID: {}", config.id);
585    /// println!("Number of queries: {}", config.queries.len());
586    /// # Ok(())
587    /// # }
588    /// ```
589    pub fn get_config(&self) -> &RuntimeConfig {
590        &self.config
591    }
592}
593
594// ============================================================================
595// QueryProvider Trait Implementation
596// ============================================================================
597
598// Implement QueryProvider trait for DrasiLib
599// This breaks the circular dependency by providing a minimal interface for reactions
600#[async_trait::async_trait]
601impl crate::reactions::QueryProvider for DrasiLib {
602    async fn get_query_instance(&self, id: &str) -> Result<Arc<dyn crate::queries::Query>> {
603        self.query_manager
604            .get_query_instance(id)
605            .await
606            .map_err(|e| anyhow::anyhow!(e))
607    }
608}
609
610// ============================================================================
611// Tests
612// ============================================================================
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617
618    async fn create_test_server() -> DrasiLib {
619        DrasiLib::builder()
620            .with_id("test-server")
621            .build()
622            .await
623            .expect("Failed to build server")
624    }
625
626    #[tokio::test]
627    async fn test_middleware_registry_is_initialized() {
628        let core = create_test_server().await;
629
630        let registry = core.middleware_registry();
631
632        // Verify that middleware factories are registered when their features are enabled
633        #[cfg(feature = "middleware-jq")]
634        assert!(
635            registry.get("jq").is_some(),
636            "JQ factory should be registered"
637        );
638        #[cfg(feature = "middleware-map")]
639        assert!(
640            registry.get("map").is_some(),
641            "Map factory should be registered"
642        );
643        #[cfg(feature = "middleware-unwind")]
644        assert!(
645            registry.get("unwind").is_some(),
646            "Unwind factory should be registered"
647        );
648        #[cfg(feature = "middleware-relabel")]
649        assert!(
650            registry.get("relabel").is_some(),
651            "Relabel factory should be registered"
652        );
653        #[cfg(feature = "middleware-decoder")]
654        assert!(
655            registry.get("decoder").is_some(),
656            "Decoder factory should be registered"
657        );
658        #[cfg(feature = "middleware-parse-json")]
659        assert!(
660            registry.get("parse_json").is_some(),
661            "ParseJson factory should be registered"
662        );
663        #[cfg(feature = "middleware-promote")]
664        assert!(
665            registry.get("promote").is_some(),
666            "Promote factory should be registered"
667        );
668    }
669
670    #[tokio::test]
671    async fn test_middleware_registry_arc_sharing() {
672        let core = create_test_server().await;
673
674        let registry1 = core.middleware_registry();
675        let registry2 = core.middleware_registry();
676
677        // Both Arc instances should point to the same underlying registry
678        // We can't directly test Arc equality, but we can verify both work
679        // Test with any available middleware feature
680        #[cfg(feature = "middleware-jq")]
681        {
682            assert!(registry1.get("jq").is_some());
683            assert!(registry2.get("jq").is_some());
684        }
685        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
686        {
687            assert!(registry1.get("map").is_some());
688            assert!(registry2.get("map").is_some());
689        }
690        #[cfg(all(
691            feature = "middleware-decoder",
692            not(feature = "middleware-jq"),
693            not(feature = "middleware-map")
694        ))]
695        {
696            assert!(registry1.get("decoder").is_some());
697            assert!(registry2.get("decoder").is_some());
698        }
699    }
700
701    #[tokio::test]
702    async fn test_middleware_registry_accessible_before_start() {
703        let core = create_test_server().await;
704
705        // Should be accessible even before server is started
706        assert!(!core.is_running().await);
707        let registry = core.middleware_registry();
708
709        // Verify registry is accessible (test with any available middleware)
710        #[cfg(feature = "middleware-jq")]
711        assert!(registry.get("jq").is_some());
712        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
713        assert!(registry.get("map").is_some());
714
715        // If no middleware features are enabled, just verify the registry exists
716        #[cfg(not(any(
717            feature = "middleware-jq",
718            feature = "middleware-map",
719            feature = "middleware-decoder",
720            feature = "middleware-parse-json",
721            feature = "middleware-promote",
722            feature = "middleware-relabel",
723            feature = "middleware-unwind"
724        )))]
725        {
726            // Registry should exist even with no middleware
727            let _ = registry;
728        }
729    }
730
731    #[tokio::test]
732    async fn test_middleware_registry_accessible_after_start() {
733        let core = create_test_server().await;
734
735        core.start().await.expect("Failed to start server");
736
737        // Should be accessible after server is started
738        assert!(core.is_running().await);
739        let registry = core.middleware_registry();
740
741        // Verify registry is accessible (test with any available middleware)
742        #[cfg(feature = "middleware-jq")]
743        assert!(registry.get("jq").is_some());
744        #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
745        assert!(registry.get("map").is_some());
746
747        // If no middleware features are enabled, just verify the registry exists
748        #[cfg(not(any(
749            feature = "middleware-jq",
750            feature = "middleware-map",
751            feature = "middleware-decoder",
752            feature = "middleware-parse-json",
753            feature = "middleware-promote",
754            feature = "middleware-relabel",
755            feature = "middleware-unwind"
756        )))]
757        {
758            // Registry should exist even with no middleware
759            let _ = registry;
760        }
761
762        core.stop().await.expect("Failed to stop server");
763    }
764}