drasi_lib/lib_core.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{anyhow, Result};
16use log::{info, warn};
17use std::sync::Arc;
18use tokio::sync::RwLock;
19
20use crate::channels::*;
21use crate::config::{DrasiLibConfig, RuntimeConfig};
22use crate::inspection::InspectionAPI;
23use crate::lifecycle::LifecycleManager;
24use crate::queries::QueryManager;
25use crate::reactions::ReactionManager;
26use crate::sources::SourceManager;
27use crate::state_guard::StateGuard;
28use drasi_core::middleware::MiddlewareTypeRegistry;
29
30/// Core Drasi Server for continuous query processing
31///
32/// `DrasiLib` is the main entry point for embedding Drasi functionality in your application.
33/// It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions
34/// (output destinations) with a reactive event-driven architecture.
35///
36/// # Architecture
37///
38/// - **Sources**: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
39/// - **Queries**: Continuous Cypher or GQL queries that process data changes in real-time
40/// - **Reactions**: Output destinations that receive query results (HTTP, gRPC, Application, Log)
41///
42/// # Lifecycle States
43///
44/// The server progresses through these states:
45/// 1. **Created** (via `builder()`)
46/// 2. **Initialized** (one-time setup, automatic with builder)
47/// 3. **Running** (after `start()`)
48/// 4. **Stopped** (after `stop()`, can be restarted)
49///
50/// Components (sources, queries, reactions) have independent lifecycle states:
51/// - `Stopped`: Component exists but is not processing
52/// - `Starting`: Component is initializing
53/// - `Running`: Component is actively processing
54/// - `Stopping`: Component is shutting down
55///
56/// # Thread Safety
57///
58/// `DrasiLib` is `Clone` (all clones share the same underlying state) and all methods
59/// are thread-safe. You can safely share clones across threads and call methods concurrently.
60///
61/// # Examples
62///
63/// ## Builder Pattern
64///
65/// ```ignore
66/// use drasi_lib::{DrasiLib, Query};
67///
68/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
69/// let core = DrasiLib::builder()
70/// .with_id("my-server")
71/// .with_source(my_source) // Pre-built source instance
72/// .with_query(
73/// Query::cypher("my-query")
74/// .query("MATCH (n:Person) RETURN n.name, n.age")
75/// .from_source("events")
76/// .auto_start(true)
77/// .build()
78/// )
79/// .with_reaction(my_reaction) // Pre-built reaction instance
80/// .build()
81/// .await?;
82///
83/// // Start all auto-start components
84/// core.start().await?;
85///
86/// // List and inspect components
87/// let sources = core.list_sources().await?;
88/// let queries = core.list_queries().await?;
89///
90/// // Stop server
91/// core.stop().await?;
92/// # Ok(())
93/// # }
94/// ```
95///
96/// ## Dynamic Runtime Configuration
97///
98/// ```ignore
99/// use drasi_lib::{DrasiLib, Query};
100///
101/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
102/// let core = DrasiLib::builder()
103/// .with_id("dynamic-server")
104/// .build()
105/// .await?;
106///
107/// core.start().await?;
108///
109/// // Add components at runtime
110/// core.add_source(new_source_instance).await?;
111///
112/// core.add_query(
113/// Query::cypher("new-query")
114/// .query("MATCH (n) RETURN n")
115/// .from_source("new-source")
116/// .auto_start(true)
117/// .build()
118/// ).await?;
119///
120/// // Start/stop individual components
121/// core.stop_query("new-query").await?;
122/// core.start_query("new-query").await?;
123///
124/// // Remove components
125/// core.remove_query("new-query").await?;
126/// core.remove_source("new-source").await?;
127/// # Ok(())
128/// # }
129/// ```
130///
131/// ## Restart Behavior
132///
133/// When you call `stop()` and then `start()` again, only components with `auto_start=true`
134/// will be started. Components that were manually started (with `auto_start=false`) will
135/// remain stopped:
136///
137/// ```no_run
138/// # use drasi_lib::DrasiLib;
139/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
140/// # let core = DrasiLib::builder().build().await?;
141/// core.start().await?;
142/// // ... only auto_start=true components are running ...
143///
144/// core.stop().await?;
145/// // ... all components stopped ...
146///
147/// core.start().await?;
148/// // ... only auto_start=true components restarted ...
149/// # Ok(())
150/// # }
151/// ```
152pub struct DrasiLib {
153 pub(crate) config: Arc<RuntimeConfig>,
154 pub(crate) source_manager: Arc<SourceManager>,
155 pub(crate) query_manager: Arc<QueryManager>,
156 pub(crate) reaction_manager: Arc<ReactionManager>,
157 pub(crate) running: Arc<RwLock<bool>>,
158 pub(crate) state_guard: StateGuard,
159 // Inspection API for querying server state
160 pub(crate) inspection: InspectionAPI,
161 // Lifecycle manager for orchestrating component lifecycle
162 pub(crate) lifecycle: Arc<RwLock<LifecycleManager>>,
163 // Middleware registry for source middleware
164 pub(crate) middleware_registry: Arc<MiddlewareTypeRegistry>,
165}
166
167impl Clone for DrasiLib {
168 fn clone(&self) -> Self {
169 Self {
170 config: Arc::clone(&self.config),
171 source_manager: Arc::clone(&self.source_manager),
172 query_manager: Arc::clone(&self.query_manager),
173 reaction_manager: Arc::clone(&self.reaction_manager),
174 running: Arc::clone(&self.running),
175 state_guard: self.state_guard.clone(),
176 inspection: self.inspection.clone(),
177 lifecycle: Arc::clone(&self.lifecycle),
178 middleware_registry: Arc::clone(&self.middleware_registry),
179 }
180 }
181}
182
183impl DrasiLib {
184 // ============================================================================
185 // Construction and Initialization
186 // ============================================================================
187
188 /// Create an Arc-wrapped reference to self
189 ///
190 /// Since DrasiLib contains all Arc-wrapped fields, cloning is cheap
191 /// (just increments ref counts), but this helper makes the intent clearer
192 /// and provides a single place to document this pattern.
193 pub(crate) fn as_arc(&self) -> Arc<Self> {
194 Arc::new(self.clone())
195 }
196
197 /// Internal constructor - creates uninitialized server
198 /// Use `builder()` instead
199 pub(crate) fn new(config: Arc<RuntimeConfig>) -> Self {
200 let (channels, receivers) = EventChannels::new();
201
202 let source_manager = Arc::new(SourceManager::new(channels.component_event_tx.clone()));
203
204 // Initialize middleware registry and register all standard middleware factories
205 let mut middleware_registry = MiddlewareTypeRegistry::new();
206
207 #[cfg(feature = "middleware-jq")]
208 middleware_registry.register(Arc::new(drasi_middleware::jq::JQFactory::new()));
209
210 #[cfg(feature = "middleware-map")]
211 middleware_registry.register(Arc::new(drasi_middleware::map::MapFactory::new()));
212
213 #[cfg(feature = "middleware-unwind")]
214 middleware_registry.register(Arc::new(drasi_middleware::unwind::UnwindFactory::new()));
215
216 #[cfg(feature = "middleware-relabel")]
217 middleware_registry.register(Arc::new(
218 drasi_middleware::relabel::RelabelMiddlewareFactory::new(),
219 ));
220
221 #[cfg(feature = "middleware-decoder")]
222 middleware_registry.register(Arc::new(drasi_middleware::decoder::DecoderFactory::new()));
223
224 #[cfg(feature = "middleware-parse-json")]
225 middleware_registry.register(Arc::new(
226 drasi_middleware::parse_json::ParseJsonFactory::new(),
227 ));
228
229 #[cfg(feature = "middleware-promote")]
230 middleware_registry.register(Arc::new(
231 drasi_middleware::promote::PromoteMiddlewareFactory::new(),
232 ));
233
234 let middleware_registry = Arc::new(middleware_registry);
235
236 let query_manager = Arc::new(QueryManager::new(
237 channels.component_event_tx.clone(),
238 source_manager.clone(),
239 config.index_factory.clone(),
240 middleware_registry.clone(),
241 ));
242
243 let reaction_manager = Arc::new(ReactionManager::new(channels.component_event_tx.clone()));
244
245 let state_guard = StateGuard::new();
246
247 let inspection = InspectionAPI::new(
248 source_manager.clone(),
249 query_manager.clone(),
250 reaction_manager.clone(),
251 state_guard.clone(),
252 config.clone(),
253 );
254
255 let lifecycle = Arc::new(RwLock::new(LifecycleManager::new(
256 config.clone(),
257 source_manager.clone(),
258 query_manager.clone(),
259 reaction_manager.clone(),
260 Some(receivers),
261 )));
262
263 Self {
264 config,
265 source_manager,
266 query_manager,
267 reaction_manager,
268 running: Arc::new(RwLock::new(false)),
269 state_guard,
270 inspection,
271 lifecycle,
272 middleware_registry,
273 }
274 }
275
276 /// Internal initialization - performs one-time setup
277 /// This is called internally by builder and config loaders
278 pub(crate) async fn initialize(&mut self) -> Result<()> {
279 let already_initialized = self.state_guard.is_initialized().await;
280 if already_initialized {
281 info!("Server already initialized, skipping initialization");
282 return Ok(());
283 }
284
285 info!("Initializing Drasi Server Core");
286
287 // Inject QueryProvider into ReactionManager
288 // This allows reactions to access queries when they start
289 let query_provider: Arc<dyn crate::reactions::QueryProvider> = self.as_arc();
290 self.reaction_manager
291 .inject_query_provider(query_provider)
292 .await;
293
294 // Inject StateStoreProvider into SourceManager and ReactionManager
295 // This allows sources and reactions to persist state
296 let state_store = self.config.state_store_provider.clone();
297 self.source_manager
298 .inject_state_store(state_store.clone())
299 .await;
300 self.reaction_manager.inject_state_store(state_store).await;
301
302 // Load configuration
303 let lifecycle = self.lifecycle.read().await;
304 lifecycle.load_configuration().await?;
305 drop(lifecycle);
306
307 // Start event processors (one-time)
308 let mut lifecycle = self.lifecycle.write().await;
309 lifecycle.start_event_processors().await;
310 drop(lifecycle);
311
312 self.state_guard.mark_initialized().await;
313 info!("Drasi Server Core initialized successfully");
314 Ok(())
315 }
316
317 // ============================================================================
318 // Lifecycle Operations (start/stop)
319 // ============================================================================
320
321 /// Start the server and all auto-start components
322 ///
323 /// This starts all components (sources, queries, reactions) that have `auto_start` set to `true`,
324 /// as well as any components that were running when `stop()` was last called.
325 ///
326 /// Components are started in dependency order: Sources → Queries → Reactions
327 ///
328 /// # Errors
329 ///
330 /// Returns an error if:
331 /// * The server is not initialized (`DrasiError::InvalidState`)
332 /// * The server is already running (`anyhow::Error`)
333 /// * Any component fails to start (propagated from component)
334 ///
335 /// # Examples
336 ///
337 /// ```no_run
338 /// # use drasi_lib::DrasiLib;
339 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
340 /// let core = DrasiLib::builder()
341 /// .with_id("my-server")
342 /// .build()
343 /// .await?;
344 ///
345 /// // Start server and all auto-start components
346 /// core.start().await?;
347 ///
348 /// assert!(core.is_running().await);
349 /// # Ok(())
350 /// # }
351 /// ```
352 pub async fn start(&self) -> Result<()> {
353 let mut running = self.running.write().await;
354 if *running {
355 warn!("Server is already running");
356 return Err(anyhow!("Server is already running"));
357 }
358
359 info!("Starting Drasi Server Core");
360
361 // Ensure initialized
362 if !self.state_guard.is_initialized().await {
363 return Err(anyhow!("Server must be initialized before starting"));
364 }
365
366 // Start all configured components
367 let lifecycle = self.lifecycle.read().await;
368 lifecycle.start_components().await?;
369
370 *running = true;
371 info!("Drasi Server Core started successfully");
372
373 Ok(())
374 }
375
376 /// Stop the server and all running components
377 ///
378 /// This stops all currently running components (sources, queries, reactions).
379 /// Components are stopped in reverse dependency order: Reactions → Queries → Sources
380 ///
381 /// On the next `start()`, only components with `auto_start=true` will be restarted.
382 ///
383 /// # Errors
384 ///
385 /// Returns an error if:
386 /// * The server is not running (`anyhow::Error`)
387 /// * Any component fails to stop (logged as error, but doesn't prevent other components from stopping)
388 ///
389 /// # Examples
390 ///
391 /// ```no_run
392 /// # use drasi_lib::DrasiLib;
393 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
394 /// # let core = DrasiLib::builder().build().await?;
395 /// # core.start().await?;
396 /// // Stop server and all running components
397 /// core.stop().await?;
398 ///
399 /// assert!(!core.is_running().await);
400 ///
401 /// // Only auto_start=true components will be restarted
402 /// core.start().await?;
403 /// # Ok(())
404 /// # }
405 /// ```
406 pub async fn stop(&self) -> Result<()> {
407 let mut running = self.running.write().await;
408 if !*running {
409 warn!("Server is already stopped");
410 return Err(anyhow!("Server is already stopped"));
411 }
412
413 info!("Stopping Drasi Server Core");
414
415 // Stop all components
416 let lifecycle = self.lifecycle.read().await;
417 lifecycle.stop_all_components().await?;
418
419 *running = false;
420 info!("Drasi Server Core stopped successfully");
421
422 Ok(())
423 }
424
425 // ============================================================================
426 // Handle Access (for advanced usage)
427 // ============================================================================
428
429 /// Get direct access to the query manager (advanced usage)
430 ///
431 /// This provides low-level access to the query manager for advanced scenarios.
432 /// Most users should use the higher-level methods like `get_query_info()`,
433 /// `start_query()`, etc. instead.
434 ///
435 /// # Thread Safety
436 ///
437 /// The returned reference is thread-safe and can be used across threads.
438 pub fn query_manager(&self) -> &QueryManager {
439 &self.query_manager
440 }
441
442 /// Get access to the middleware registry
443 ///
444 /// Returns a reference to the middleware type registry that contains all registered
445 /// middleware factories. The registry is pre-populated with all standard middleware
446 /// types (jq, map, unwind, relabel, decoder, parse_json, promote).
447 ///
448 /// # Thread Safety
449 ///
450 /// The returned Arc can be cloned and used across threads.
451 ///
452 /// # Examples
453 ///
454 /// ```no_run
455 /// # use drasi_lib::DrasiLib;
456 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
457 /// let core = DrasiLib::builder().build().await?;
458 /// let registry = core.middleware_registry();
459 /// // Use registry to create middleware instances
460 /// # Ok(())
461 /// # }
462 /// ```
463 pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry> {
464 Arc::clone(&self.middleware_registry)
465 }
466
467 // ============================================================================
468 // Configuration Snapshot
469 // ============================================================================
470
471 /// Get a complete configuration snapshot of all components
472 ///
473 /// Returns the full server configuration including all queries with their complete configurations.
474 /// Note: Sources and reactions are now instance-based and not stored in config.
475 /// Use `list_sources()` and `list_reactions()` for runtime information about these components.
476 ///
477 /// # Example
478 /// ```no_run
479 /// # use drasi_lib::DrasiLib;
480 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
481 /// let config = core.get_current_config().await?;
482 /// println!("Server has {} queries", config.queries.len());
483 /// # Ok(())
484 /// # }
485 /// ```
486 pub async fn get_current_config(&self) -> crate::error::Result<DrasiLibConfig> {
487 self.inspection.get_current_config().await
488 }
489
490 // ============================================================================
491 // Builder and Config File Loading
492 // ============================================================================
493
494 /// Create a builder for configuring a new DrasiLib instance.
495 ///
496 /// The builder provides a fluent API for adding queries and source/reaction instances.
497 /// Note: Sources and reactions are now instance-based. Use `with_source()` and `with_reaction()`
498 /// to add pre-built instances.
499 ///
500 /// # Example
501 /// ```no_run
502 /// use drasi_lib::{DrasiLib, Query};
503 ///
504 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
505 /// let core = DrasiLib::builder()
506 /// .with_id("my-server")
507 /// .with_query(
508 /// Query::cypher("my-query")
509 /// .query("MATCH (n) RETURN n")
510 /// .from_source("events")
511 /// .build()
512 /// )
513 /// // Use .with_source(source_instance) and .with_reaction(reaction_instance)
514 /// // for pre-built source and reaction instances
515 /// .build()
516 /// .await?;
517 ///
518 /// core.start().await?;
519 /// # Ok(())
520 /// # }
521 /// ```
522 pub fn builder() -> crate::builder::DrasiLibBuilder {
523 crate::builder::DrasiLibBuilder::new()
524 }
525
526 // ============================================================================
527 // Server Status
528 // ============================================================================
529
530 /// Check if the server is currently running
531 ///
532 /// Returns `true` if `start()` has been called and the server is actively processing,
533 /// `false` if the server is stopped or has not been started yet.
534 ///
535 /// # Thread Safety
536 ///
537 /// This method is thread-safe and can be called concurrently.
538 ///
539 /// # Examples
540 ///
541 /// ```no_run
542 /// # use drasi_lib::DrasiLib;
543 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
544 /// let core = DrasiLib::builder().build().await?;
545 ///
546 /// assert!(!core.is_running().await); // Not started yet
547 ///
548 /// core.start().await?;
549 /// assert!(core.is_running().await); // Now running
550 ///
551 /// core.stop().await?;
552 /// assert!(!core.is_running().await); // Stopped again
553 /// # Ok(())
554 /// # }
555 /// ```
556 pub async fn is_running(&self) -> bool {
557 *self.running.read().await
558 }
559
560 /// Get the runtime configuration
561 ///
562 /// Returns a reference to the immutable runtime configuration containing server settings
563 /// and all component configurations. This is the configuration that was provided during
564 /// initialization (via builder, config file, or config string).
565 ///
566 /// For a current snapshot of the configuration including runtime additions, use
567 /// [`get_current_config()`](Self::get_current_config) instead.
568 ///
569 /// # Thread Safety
570 ///
571 /// This method is thread-safe and can be called concurrently.
572 ///
573 /// # Examples
574 ///
575 /// ```no_run
576 /// # use drasi_lib::DrasiLib;
577 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
578 /// let core = DrasiLib::builder()
579 /// .with_id("my-server")
580 /// .build()
581 /// .await?;
582 ///
583 /// let config = core.get_config();
584 /// println!("Server ID: {}", config.id);
585 /// println!("Number of queries: {}", config.queries.len());
586 /// # Ok(())
587 /// # }
588 /// ```
589 pub fn get_config(&self) -> &RuntimeConfig {
590 &self.config
591 }
592}
593
594// ============================================================================
595// QueryProvider Trait Implementation
596// ============================================================================
597
598// Implement QueryProvider trait for DrasiLib
599// This breaks the circular dependency by providing a minimal interface for reactions
600#[async_trait::async_trait]
601impl crate::reactions::QueryProvider for DrasiLib {
602 async fn get_query_instance(&self, id: &str) -> Result<Arc<dyn crate::queries::Query>> {
603 self.query_manager
604 .get_query_instance(id)
605 .await
606 .map_err(|e| anyhow::anyhow!(e))
607 }
608}
609
610// ============================================================================
611// Tests
612// ============================================================================
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617
618 async fn create_test_server() -> DrasiLib {
619 DrasiLib::builder()
620 .with_id("test-server")
621 .build()
622 .await
623 .expect("Failed to build server")
624 }
625
626 #[tokio::test]
627 async fn test_middleware_registry_is_initialized() {
628 let core = create_test_server().await;
629
630 let registry = core.middleware_registry();
631
632 // Verify that middleware factories are registered when their features are enabled
633 #[cfg(feature = "middleware-jq")]
634 assert!(
635 registry.get("jq").is_some(),
636 "JQ factory should be registered"
637 );
638 #[cfg(feature = "middleware-map")]
639 assert!(
640 registry.get("map").is_some(),
641 "Map factory should be registered"
642 );
643 #[cfg(feature = "middleware-unwind")]
644 assert!(
645 registry.get("unwind").is_some(),
646 "Unwind factory should be registered"
647 );
648 #[cfg(feature = "middleware-relabel")]
649 assert!(
650 registry.get("relabel").is_some(),
651 "Relabel factory should be registered"
652 );
653 #[cfg(feature = "middleware-decoder")]
654 assert!(
655 registry.get("decoder").is_some(),
656 "Decoder factory should be registered"
657 );
658 #[cfg(feature = "middleware-parse-json")]
659 assert!(
660 registry.get("parse_json").is_some(),
661 "ParseJson factory should be registered"
662 );
663 #[cfg(feature = "middleware-promote")]
664 assert!(
665 registry.get("promote").is_some(),
666 "Promote factory should be registered"
667 );
668 }
669
670 #[tokio::test]
671 async fn test_middleware_registry_arc_sharing() {
672 let core = create_test_server().await;
673
674 let registry1 = core.middleware_registry();
675 let registry2 = core.middleware_registry();
676
677 // Both Arc instances should point to the same underlying registry
678 // We can't directly test Arc equality, but we can verify both work
679 // Test with any available middleware feature
680 #[cfg(feature = "middleware-jq")]
681 {
682 assert!(registry1.get("jq").is_some());
683 assert!(registry2.get("jq").is_some());
684 }
685 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
686 {
687 assert!(registry1.get("map").is_some());
688 assert!(registry2.get("map").is_some());
689 }
690 #[cfg(all(
691 feature = "middleware-decoder",
692 not(feature = "middleware-jq"),
693 not(feature = "middleware-map")
694 ))]
695 {
696 assert!(registry1.get("decoder").is_some());
697 assert!(registry2.get("decoder").is_some());
698 }
699 }
700
701 #[tokio::test]
702 async fn test_middleware_registry_accessible_before_start() {
703 let core = create_test_server().await;
704
705 // Should be accessible even before server is started
706 assert!(!core.is_running().await);
707 let registry = core.middleware_registry();
708
709 // Verify registry is accessible (test with any available middleware)
710 #[cfg(feature = "middleware-jq")]
711 assert!(registry.get("jq").is_some());
712 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
713 assert!(registry.get("map").is_some());
714
715 // If no middleware features are enabled, just verify the registry exists
716 #[cfg(not(any(
717 feature = "middleware-jq",
718 feature = "middleware-map",
719 feature = "middleware-decoder",
720 feature = "middleware-parse-json",
721 feature = "middleware-promote",
722 feature = "middleware-relabel",
723 feature = "middleware-unwind"
724 )))]
725 {
726 // Registry should exist even with no middleware
727 let _ = registry;
728 }
729 }
730
731 #[tokio::test]
732 async fn test_middleware_registry_accessible_after_start() {
733 let core = create_test_server().await;
734
735 core.start().await.expect("Failed to start server");
736
737 // Should be accessible after server is started
738 assert!(core.is_running().await);
739 let registry = core.middleware_registry();
740
741 // Verify registry is accessible (test with any available middleware)
742 #[cfg(feature = "middleware-jq")]
743 assert!(registry.get("jq").is_some());
744 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
745 assert!(registry.get("map").is_some());
746
747 // If no middleware features are enabled, just verify the registry exists
748 #[cfg(not(any(
749 feature = "middleware-jq",
750 feature = "middleware-map",
751 feature = "middleware-decoder",
752 feature = "middleware-parse-json",
753 feature = "middleware-promote",
754 feature = "middleware-relabel",
755 feature = "middleware-unwind"
756 )))]
757 {
758 // Registry should exist even with no middleware
759 let _ = registry;
760 }
761
762 core.stop().await.expect("Failed to stop server");
763 }
764}