drasi_lib/lib_core.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{anyhow, Result};
16use log::{info, warn};
17use std::sync::Arc;
18use tokio::sync::RwLock;
19
20use crate::channels::*;
21use crate::config::{DrasiLibConfig, RuntimeConfig};
22use crate::inspection::InspectionAPI;
23use crate::lifecycle::LifecycleManager;
24use crate::managers::ComponentLogRegistry;
25use crate::queries::QueryManager;
26use crate::reactions::ReactionManager;
27use crate::sources::SourceManager;
28use crate::state_guard::StateGuard;
29use drasi_core::middleware::MiddlewareTypeRegistry;
30
31/// Core Drasi Server for continuous query processing
32///
33/// `DrasiLib` is the main entry point for embedding Drasi functionality in your application.
34/// It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions
35/// (output destinations) with a reactive event-driven architecture.
36///
37/// # Architecture
38///
39/// - **Sources**: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
40/// - **Queries**: Continuous Cypher or GQL queries that process data changes in real-time
41/// - **Reactions**: Output destinations that receive query results (HTTP, gRPC, Application, Log)
42///
43/// # Lifecycle States
44///
45/// The server progresses through these states:
46/// 1. **Created** (via `builder()`)
47/// 2. **Initialized** (one-time setup, automatic with builder)
48/// 3. **Running** (after `start()`)
49/// 4. **Stopped** (after `stop()`, can be restarted)
50///
51/// Components (sources, queries, reactions) have independent lifecycle states:
52/// - `Stopped`: Component exists but is not processing
53/// - `Starting`: Component is initializing
54/// - `Running`: Component is actively processing
55/// - `Stopping`: Component is shutting down
56///
57/// # Thread Safety
58///
59/// `DrasiLib` is `Clone` (all clones share the same underlying state) and all methods
60/// are thread-safe. You can safely share clones across threads and call methods concurrently.
61///
62/// # Examples
63///
64/// ## Builder Pattern
65///
66/// ```ignore
67/// use drasi_lib::{DrasiLib, Query};
68///
69/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
70/// let core = DrasiLib::builder()
71/// .with_id("my-server")
72/// .with_source(my_source) // Pre-built source instance
73/// .with_query(
74/// Query::cypher("my-query")
75/// .query("MATCH (n:Person) RETURN n.name, n.age")
76/// .from_source("events")
77/// .auto_start(true)
78/// .build()
79/// )
80/// .with_reaction(my_reaction) // Pre-built reaction instance
81/// .build()
82/// .await?;
83///
84/// // Start all auto-start components
85/// core.start().await?;
86///
87/// // List and inspect components
88/// let sources = core.list_sources().await?;
89/// let queries = core.list_queries().await?;
90///
91/// // Stop server
92/// core.stop().await?;
93/// # Ok(())
94/// # }
95/// ```
96///
97/// ## Dynamic Runtime Configuration
98///
99/// ```ignore
100/// use drasi_lib::{DrasiLib, Query};
101///
102/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
103/// let core = DrasiLib::builder()
104/// .with_id("dynamic-server")
105/// .build()
106/// .await?;
107///
108/// core.start().await?;
109///
110/// // Add components at runtime
111/// core.add_source(new_source_instance).await?;
112///
113/// core.add_query(
114/// Query::cypher("new-query")
115/// .query("MATCH (n) RETURN n")
116/// .from_source("new-source")
117/// .auto_start(true)
118/// .build()
119/// ).await?;
120///
121/// // Start/stop individual components
122/// core.stop_query("new-query").await?;
123/// core.start_query("new-query").await?;
124///
125/// // Remove components
126/// core.remove_query("new-query").await?;
127/// core.remove_source("new-source", false).await?;
128/// # Ok(())
129/// # }
130/// ```
131///
132/// ## Restart Behavior
133///
134/// When you call `stop()` and then `start()` again, only components with `auto_start=true`
135/// will be started. Components that were manually started (with `auto_start=false`) will
136/// remain stopped:
137///
138/// ```no_run
139/// # use drasi_lib::DrasiLib;
140/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141/// # let core = DrasiLib::builder().build().await?;
142/// core.start().await?;
143/// // ... only auto_start=true components are running ...
144///
145/// core.stop().await?;
146/// // ... all components stopped ...
147///
148/// core.start().await?;
149/// // ... only auto_start=true components restarted ...
150/// # Ok(())
151/// # }
152/// ```
153pub struct DrasiLib {
154 pub(crate) config: Arc<RuntimeConfig>,
155 pub(crate) source_manager: Arc<SourceManager>,
156 pub(crate) query_manager: Arc<QueryManager>,
157 pub(crate) reaction_manager: Arc<ReactionManager>,
158 pub(crate) running: Arc<RwLock<bool>>,
159 pub(crate) state_guard: StateGuard,
160 // Inspection API for querying server state
161 pub(crate) inspection: InspectionAPI,
162 // Lifecycle manager for orchestrating component lifecycle
163 pub(crate) lifecycle: Arc<RwLock<LifecycleManager>>,
164 // Middleware registry for source middleware
165 pub(crate) middleware_registry: Arc<MiddlewareTypeRegistry>,
166 // Component log registry for live log streaming
167 pub(crate) log_registry: Arc<ComponentLogRegistry>,
168}
169
170impl Clone for DrasiLib {
171 fn clone(&self) -> Self {
172 Self {
173 config: Arc::clone(&self.config),
174 source_manager: Arc::clone(&self.source_manager),
175 query_manager: Arc::clone(&self.query_manager),
176 reaction_manager: Arc::clone(&self.reaction_manager),
177 running: Arc::clone(&self.running),
178 state_guard: self.state_guard.clone(),
179 inspection: self.inspection.clone(),
180 lifecycle: Arc::clone(&self.lifecycle),
181 middleware_registry: Arc::clone(&self.middleware_registry),
182 log_registry: Arc::clone(&self.log_registry),
183 }
184 }
185}
186
187impl DrasiLib {
188 // ============================================================================
189 // Construction and Initialization
190 // ============================================================================
191
192 /// Create an Arc-wrapped reference to self
193 ///
194 /// Since DrasiLib contains all Arc-wrapped fields, cloning is cheap
195 /// (just increments ref counts), but this helper makes the intent clearer
196 /// and provides a single place to document this pattern.
197 pub(crate) fn as_arc(&self) -> Arc<Self> {
198 Arc::new(self.clone())
199 }
200
201 /// Internal constructor - creates uninitialized server
202 /// Use `builder()` instead
203 pub(crate) fn new(config: Arc<RuntimeConfig>) -> Self {
204 let (channels, receivers) = EventChannels::new();
205
206 // Use the shared global log registry.
207 // Since tracing uses a single global subscriber, all DrasiLib instances
208 // share the same log registry. This ensures logs are properly routed
209 // regardless of how many DrasiLib instances are created.
210 let log_registry = crate::managers::get_or_init_global_registry();
211
212 // Get the instance ID from config for log routing
213 let instance_id = config.id.clone();
214
215 let source_manager = Arc::new(SourceManager::new(
216 &instance_id,
217 channels.component_event_tx.clone(),
218 log_registry.clone(),
219 ));
220
221 // Initialize middleware registry and register all standard middleware factories
222 let mut middleware_registry = MiddlewareTypeRegistry::new();
223
224 #[cfg(feature = "middleware-jq")]
225 middleware_registry.register(Arc::new(drasi_middleware::jq::JQFactory::new()));
226
227 #[cfg(feature = "middleware-map")]
228 middleware_registry.register(Arc::new(drasi_middleware::map::MapFactory::new()));
229
230 #[cfg(feature = "middleware-unwind")]
231 middleware_registry.register(Arc::new(drasi_middleware::unwind::UnwindFactory::new()));
232
233 #[cfg(feature = "middleware-relabel")]
234 middleware_registry.register(Arc::new(
235 drasi_middleware::relabel::RelabelMiddlewareFactory::new(),
236 ));
237
238 #[cfg(feature = "middleware-decoder")]
239 middleware_registry.register(Arc::new(drasi_middleware::decoder::DecoderFactory::new()));
240
241 #[cfg(feature = "middleware-parse-json")]
242 middleware_registry.register(Arc::new(
243 drasi_middleware::parse_json::ParseJsonFactory::new(),
244 ));
245
246 #[cfg(feature = "middleware-promote")]
247 middleware_registry.register(Arc::new(
248 drasi_middleware::promote::PromoteMiddlewareFactory::new(),
249 ));
250
251 let middleware_registry = Arc::new(middleware_registry);
252
253 let query_manager = Arc::new(QueryManager::new(
254 &instance_id,
255 channels.component_event_tx.clone(),
256 source_manager.clone(),
257 config.index_factory.clone(),
258 middleware_registry.clone(),
259 log_registry.clone(),
260 ));
261
262 let reaction_manager = Arc::new(ReactionManager::new(
263 &instance_id,
264 channels.component_event_tx.clone(),
265 log_registry.clone(),
266 ));
267
268 let state_guard = StateGuard::new();
269
270 let inspection = InspectionAPI::new(
271 source_manager.clone(),
272 query_manager.clone(),
273 reaction_manager.clone(),
274 state_guard.clone(),
275 config.clone(),
276 );
277
278 let lifecycle = Arc::new(RwLock::new(LifecycleManager::new(
279 config.clone(),
280 source_manager.clone(),
281 query_manager.clone(),
282 reaction_manager.clone(),
283 Some(receivers),
284 )));
285
286 Self {
287 config,
288 source_manager,
289 query_manager,
290 reaction_manager,
291 running: Arc::new(RwLock::new(false)),
292 state_guard,
293 inspection,
294 lifecycle,
295 middleware_registry,
296 log_registry,
297 }
298 }
299
300 /// Internal initialization - performs one-time setup
301 /// This is called internally by builder and config loaders
302 pub(crate) async fn initialize(&mut self) -> Result<()> {
303 let already_initialized = self.state_guard.is_initialized().await;
304 if already_initialized {
305 info!("Server already initialized, skipping initialization");
306 return Ok(());
307 }
308
309 info!("Initializing Drasi Server Core");
310
311 // Inject QueryProvider into ReactionManager
312 // This allows reactions to access queries when they start
313 let query_provider: Arc<dyn crate::reactions::QueryProvider> = self.as_arc();
314 self.reaction_manager
315 .inject_query_provider(query_provider)
316 .await;
317
318 // Inject StateStoreProvider into SourceManager and ReactionManager
319 // This allows sources and reactions to persist state
320 let state_store = self.config.state_store_provider.clone();
321 self.source_manager
322 .inject_state_store(state_store.clone())
323 .await;
324 self.reaction_manager.inject_state_store(state_store).await;
325
326 // Load configuration
327 let lifecycle = self.lifecycle.read().await;
328 lifecycle.load_configuration().await?;
329 drop(lifecycle);
330
331 // Start event processors (one-time)
332 let mut lifecycle = self.lifecycle.write().await;
333 lifecycle.start_event_processors().await;
334 drop(lifecycle);
335
336 self.state_guard.mark_initialized().await;
337 info!("Drasi Server Core initialized successfully");
338 Ok(())
339 }
340
341 // ============================================================================
342 // Lifecycle Operations (start/stop)
343 // ============================================================================
344
345 /// Start the server and all auto-start components
346 ///
347 /// This starts all components (sources, queries, reactions) that have `auto_start` set to `true`,
348 /// as well as any components that were running when `stop()` was last called.
349 ///
350 /// Components are started in dependency order: Sources → Queries → Reactions
351 ///
352 /// # Errors
353 ///
354 /// Returns an error if:
355 /// * The server is not initialized (`DrasiError::InvalidState`)
356 /// * The server is already running (`anyhow::Error`)
357 /// * Any component fails to start (propagated from component)
358 ///
359 /// # Examples
360 ///
361 /// ```no_run
362 /// # use drasi_lib::DrasiLib;
363 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
364 /// let core = DrasiLib::builder()
365 /// .with_id("my-server")
366 /// .build()
367 /// .await?;
368 ///
369 /// // Start server and all auto-start components
370 /// core.start().await?;
371 ///
372 /// assert!(core.is_running().await);
373 /// # Ok(())
374 /// # }
375 /// ```
376 pub async fn start(&self) -> Result<()> {
377 let mut running = self.running.write().await;
378 if *running {
379 warn!("Server is already running");
380 return Err(anyhow!("Server is already running"));
381 }
382
383 info!("Starting Drasi Server Core");
384
385 // Ensure initialized
386 if !self.state_guard.is_initialized().await {
387 return Err(anyhow!("Server must be initialized before starting"));
388 }
389
390 // Start all configured components
391 let lifecycle = self.lifecycle.read().await;
392 lifecycle.start_components().await?;
393
394 *running = true;
395 info!("Drasi Server Core started successfully");
396
397 Ok(())
398 }
399
400 /// Stop the server and all running components
401 ///
402 /// This stops all currently running components (sources, queries, reactions).
403 /// Components are stopped in reverse dependency order: Reactions → Queries → Sources
404 ///
405 /// On the next `start()`, only components with `auto_start=true` will be restarted.
406 ///
407 /// # Errors
408 ///
409 /// Returns an error if:
410 /// * The server is not running (`anyhow::Error`)
411 /// * Any component fails to stop (logged as error, but doesn't prevent other components from stopping)
412 ///
413 /// # Examples
414 ///
415 /// ```no_run
416 /// # use drasi_lib::DrasiLib;
417 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
418 /// # let core = DrasiLib::builder().build().await?;
419 /// # core.start().await?;
420 /// // Stop server and all running components
421 /// core.stop().await?;
422 ///
423 /// assert!(!core.is_running().await);
424 ///
425 /// // Only auto_start=true components will be restarted
426 /// core.start().await?;
427 /// # Ok(())
428 /// # }
429 /// ```
430 pub async fn stop(&self) -> Result<()> {
431 let mut running = self.running.write().await;
432 if !*running {
433 warn!("Server is already stopped");
434 return Err(anyhow!("Server is already stopped"));
435 }
436
437 info!("Stopping Drasi Server Core");
438
439 // Stop all components
440 let lifecycle = self.lifecycle.read().await;
441 lifecycle.stop_all_components().await?;
442
443 *running = false;
444 info!("Drasi Server Core stopped successfully");
445
446 Ok(())
447 }
448
449 // ============================================================================
450 // Handle Access (for advanced usage)
451 // ============================================================================
452
453 /// Get direct access to the query manager (advanced usage)
454 ///
455 /// This provides low-level access to the query manager for advanced scenarios.
456 /// Most users should use the higher-level methods like `get_query_info()`,
457 /// `start_query()`, etc. instead.
458 ///
459 /// # Thread Safety
460 ///
461 /// The returned reference is thread-safe and can be used across threads.
462 pub fn query_manager(&self) -> &QueryManager {
463 &self.query_manager
464 }
465
466 /// Get access to the middleware registry
467 ///
468 /// Returns a reference to the middleware type registry that contains all registered
469 /// middleware factories. The registry is pre-populated with all standard middleware
470 /// types (jq, map, unwind, relabel, decoder, parse_json, promote).
471 ///
472 /// # Thread Safety
473 ///
474 /// The returned Arc can be cloned and used across threads.
475 ///
476 /// # Examples
477 ///
478 /// ```no_run
479 /// # use drasi_lib::DrasiLib;
480 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
481 /// let core = DrasiLib::builder().build().await?;
482 /// let registry = core.middleware_registry();
483 /// // Use registry to create middleware instances
484 /// # Ok(())
485 /// # }
486 /// ```
487 pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry> {
488 Arc::clone(&self.middleware_registry)
489 }
490
491 // ============================================================================
492 // Configuration Snapshot
493 // ============================================================================
494
495 /// Get a complete configuration snapshot of all components
496 ///
497 /// Returns the full server configuration including all queries with their complete configurations.
498 /// Note: Sources and reactions are now instance-based and not stored in config.
499 /// Use `list_sources()` and `list_reactions()` for runtime information about these components.
500 ///
501 /// # Example
502 /// ```no_run
503 /// # use drasi_lib::DrasiLib;
504 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
505 /// let config = core.get_current_config().await?;
506 /// println!("Server has {} queries", config.queries.len());
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub async fn get_current_config(&self) -> crate::error::Result<DrasiLibConfig> {
511 self.inspection.get_current_config().await
512 }
513
514 // ============================================================================
515 // Builder and Config File Loading
516 // ============================================================================
517
518 /// Create a builder for configuring a new DrasiLib instance.
519 ///
520 /// The builder provides a fluent API for adding queries and source/reaction instances.
521 /// Note: Sources and reactions are now instance-based. Use `with_source()` and `with_reaction()`
522 /// to add pre-built instances.
523 ///
524 /// # Example
525 /// ```no_run
526 /// use drasi_lib::{DrasiLib, Query};
527 ///
528 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
529 /// let core = DrasiLib::builder()
530 /// .with_id("my-server")
531 /// .with_query(
532 /// Query::cypher("my-query")
533 /// .query("MATCH (n) RETURN n")
534 /// .from_source("events")
535 /// .build()
536 /// )
537 /// // Use .with_source(source_instance) and .with_reaction(reaction_instance)
538 /// // for pre-built source and reaction instances
539 /// .build()
540 /// .await?;
541 ///
542 /// core.start().await?;
543 /// # Ok(())
544 /// # }
545 /// ```
546 pub fn builder() -> crate::builder::DrasiLibBuilder {
547 crate::builder::DrasiLibBuilder::new()
548 }
549
550 // ============================================================================
551 // Server Status
552 // ============================================================================
553
554 /// Check if the server is currently running
555 ///
556 /// Returns `true` if `start()` has been called and the server is actively processing,
557 /// `false` if the server is stopped or has not been started yet.
558 ///
559 /// # Thread Safety
560 ///
561 /// This method is thread-safe and can be called concurrently.
562 ///
563 /// # Examples
564 ///
565 /// ```no_run
566 /// # use drasi_lib::DrasiLib;
567 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
568 /// let core = DrasiLib::builder().build().await?;
569 ///
570 /// assert!(!core.is_running().await); // Not started yet
571 ///
572 /// core.start().await?;
573 /// assert!(core.is_running().await); // Now running
574 ///
575 /// core.stop().await?;
576 /// assert!(!core.is_running().await); // Stopped again
577 /// # Ok(())
578 /// # }
579 /// ```
580 pub async fn is_running(&self) -> bool {
581 *self.running.read().await
582 }
583
584 /// Get the runtime configuration
585 ///
586 /// Returns a reference to the immutable runtime configuration containing server settings
587 /// and all component configurations. This is the configuration that was provided during
588 /// initialization (via builder, config file, or config string).
589 ///
590 /// For a current snapshot of the configuration including runtime additions, use
591 /// [`get_current_config()`](Self::get_current_config) instead.
592 ///
593 /// # Thread Safety
594 ///
595 /// This method is thread-safe and can be called concurrently.
596 ///
597 /// # Examples
598 ///
599 /// ```no_run
600 /// # use drasi_lib::DrasiLib;
601 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
602 /// let core = DrasiLib::builder()
603 /// .with_id("my-server")
604 /// .build()
605 /// .await?;
606 ///
607 /// let config = core.get_config();
608 /// println!("Server ID: {}", config.id);
609 /// println!("Number of queries: {}", config.queries.len());
610 /// # Ok(())
611 /// # }
612 /// ```
613 pub fn get_config(&self) -> &RuntimeConfig {
614 &self.config
615 }
616}
617
618// ============================================================================
619// QueryProvider Trait Implementation
620// ============================================================================
621
622// Implement QueryProvider trait for DrasiLib
623// This breaks the circular dependency by providing a minimal interface for reactions
624#[async_trait::async_trait]
625impl crate::reactions::QueryProvider for DrasiLib {
626 async fn get_query_instance(&self, id: &str) -> Result<Arc<dyn crate::queries::Query>> {
627 self.query_manager
628 .get_query_instance(id)
629 .await
630 .map_err(|e| anyhow::anyhow!(e))
631 }
632}
633
634// ============================================================================
635// Tests
636// ============================================================================
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 async fn create_test_server() -> DrasiLib {
643 DrasiLib::builder()
644 .with_id("test-server")
645 .build()
646 .await
647 .expect("Failed to build server")
648 }
649
650 #[tokio::test]
651 async fn test_middleware_registry_is_initialized() {
652 let core = create_test_server().await;
653
654 let registry = core.middleware_registry();
655
656 // Verify that middleware factories are registered when their features are enabled
657 #[cfg(feature = "middleware-jq")]
658 assert!(
659 registry.get("jq").is_some(),
660 "JQ factory should be registered"
661 );
662 #[cfg(feature = "middleware-map")]
663 assert!(
664 registry.get("map").is_some(),
665 "Map factory should be registered"
666 );
667 #[cfg(feature = "middleware-unwind")]
668 assert!(
669 registry.get("unwind").is_some(),
670 "Unwind factory should be registered"
671 );
672 #[cfg(feature = "middleware-relabel")]
673 assert!(
674 registry.get("relabel").is_some(),
675 "Relabel factory should be registered"
676 );
677 #[cfg(feature = "middleware-decoder")]
678 assert!(
679 registry.get("decoder").is_some(),
680 "Decoder factory should be registered"
681 );
682 #[cfg(feature = "middleware-parse-json")]
683 assert!(
684 registry.get("parse_json").is_some(),
685 "ParseJson factory should be registered"
686 );
687 #[cfg(feature = "middleware-promote")]
688 assert!(
689 registry.get("promote").is_some(),
690 "Promote factory should be registered"
691 );
692 }
693
694 #[tokio::test]
695 async fn test_middleware_registry_arc_sharing() {
696 let core = create_test_server().await;
697
698 let registry1 = core.middleware_registry();
699 let registry2 = core.middleware_registry();
700
701 // Both Arc instances should point to the same underlying registry
702 // We can't directly test Arc equality, but we can verify both work
703 // Test with any available middleware feature
704 #[cfg(feature = "middleware-jq")]
705 {
706 assert!(registry1.get("jq").is_some());
707 assert!(registry2.get("jq").is_some());
708 }
709 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
710 {
711 assert!(registry1.get("map").is_some());
712 assert!(registry2.get("map").is_some());
713 }
714 #[cfg(all(
715 feature = "middleware-decoder",
716 not(feature = "middleware-jq"),
717 not(feature = "middleware-map")
718 ))]
719 {
720 assert!(registry1.get("decoder").is_some());
721 assert!(registry2.get("decoder").is_some());
722 }
723 }
724
725 #[tokio::test]
726 async fn test_middleware_registry_accessible_before_start() {
727 let core = create_test_server().await;
728
729 // Should be accessible even before server is started
730 assert!(!core.is_running().await);
731 let registry = core.middleware_registry();
732
733 // Verify registry is accessible (test with any available middleware)
734 #[cfg(feature = "middleware-jq")]
735 assert!(registry.get("jq").is_some());
736 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
737 assert!(registry.get("map").is_some());
738
739 // If no middleware features are enabled, just verify the registry exists
740 #[cfg(not(any(
741 feature = "middleware-jq",
742 feature = "middleware-map",
743 feature = "middleware-decoder",
744 feature = "middleware-parse-json",
745 feature = "middleware-promote",
746 feature = "middleware-relabel",
747 feature = "middleware-unwind"
748 )))]
749 {
750 // Registry should exist even with no middleware
751 let _ = registry;
752 }
753 }
754
755 #[tokio::test]
756 async fn test_middleware_registry_accessible_after_start() {
757 let core = create_test_server().await;
758
759 core.start().await.expect("Failed to start server");
760
761 // Should be accessible after server is started
762 assert!(core.is_running().await);
763 let registry = core.middleware_registry();
764
765 // Verify registry is accessible (test with any available middleware)
766 #[cfg(feature = "middleware-jq")]
767 assert!(registry.get("jq").is_some());
768 #[cfg(all(feature = "middleware-map", not(feature = "middleware-jq")))]
769 assert!(registry.get("map").is_some());
770
771 // If no middleware features are enabled, just verify the registry exists
772 #[cfg(not(any(
773 feature = "middleware-jq",
774 feature = "middleware-map",
775 feature = "middleware-decoder",
776 feature = "middleware-parse-json",
777 feature = "middleware-promote",
778 feature = "middleware-relabel",
779 feature = "middleware-unwind"
780 )))]
781 {
782 // Registry should exist even with no middleware
783 let _ = registry;
784 }
785
786 core.stop().await.expect("Failed to stop server");
787 }
788}