use core::any::TypeId;
use core::fmt::Debug;
use core::marker::PhantomData;
extern crate alloc;
use alloc::vec::Vec;
use hashbrown::HashMap;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, sync::Arc};
#[cfg(all(not(feature = "std"), feature = "alloc"))]
use alloc::string::{String, ToString};
#[cfg(feature = "std")]
use std::{boxed::Box, sync::Arc};
use crate::extensions::Extensions;
use crate::graph::DependencyGraph;
#[cfg(feature = "std")]
type StartFnType<R> = Box<
dyn FnOnce(
Arc<R>,
)
-> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
+ Send,
>;
#[cfg(not(feature = "std"))]
type NoStdStartFnType<R> = Box<
dyn FnOnce(
Arc<R>,
)
-> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
+ Send,
>;
use crate::record_id::{RecordId, RecordKey, StringKey};
use crate::typed_api::{RecordRegistrar, RecordT};
use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
use crate::{DbError, DbResult};
#[cfg(feature = "alloc")]
pub type OutboundRoute = (
String,
Box<dyn crate::connector::ConsumerTrait>,
crate::connector::SerializerFn,
Vec<(String, String)>,
Option<crate::connector::TopicProviderFn>,
);
pub struct NoRuntime;
pub struct AimDbInner {
storages: Vec<Box<dyn AnyRecord>>,
by_key: HashMap<StringKey, RecordId>,
by_type: HashMap<TypeId, Vec<RecordId>>,
types: Vec<TypeId>,
keys: Vec<StringKey>,
dependency_graph: crate::graph::DependencyGraph,
pub(crate) extensions: Extensions,
}
impl AimDbInner {
#[inline]
pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
self.by_key.get(key.as_str()).copied()
}
#[inline]
pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
self.by_key.get(name).copied()
}
#[inline]
pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
self.storages.get(id.index()).map(|b| b.as_ref())
}
#[inline]
pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
self.keys.get(id.index())
}
pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
self.by_type
.get(&TypeId::of::<T>())
.map(|v| v.as_slice())
.unwrap_or(&[])
}
#[inline]
pub fn record_count(&self) -> usize {
self.storages.len()
}
#[inline]
pub fn dependency_graph(&self) -> &crate::graph::DependencyGraph {
&self.dependency_graph
}
pub fn get_typed_record_by_key<T, R>(
&self,
key: impl AsRef<str>,
) -> DbResult<&TypedRecord<T, R>>
where
T: Send + 'static + Debug + Clone,
R: aimdb_executor::Spawn + 'static,
{
let key_str = key.as_ref();
let id = self.resolve_str(key_str).ok_or({
#[cfg(feature = "std")]
{
DbError::RecordKeyNotFound {
key: key_str.to_string(),
}
}
#[cfg(not(feature = "std"))]
{
DbError::RecordKeyNotFound { _key: () }
}
})?;
self.get_typed_record_by_id::<T, R>(id)
}
pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
where
T: Send + 'static + Debug + Clone,
R: aimdb_executor::Spawn + 'static,
{
use crate::typed_record::AnyRecordExt;
if id.index() >= self.storages.len() {
return Err(DbError::InvalidRecordId { id: id.raw() });
}
let expected = TypeId::of::<T>();
let actual = self.types[id.index()];
if expected != actual {
#[cfg(feature = "std")]
return Err(DbError::TypeMismatch {
record_id: id.raw(),
expected_type: core::any::type_name::<T>().to_string(),
});
#[cfg(not(feature = "std"))]
return Err(DbError::TypeMismatch {
record_id: id.raw(),
_expected_type: (),
});
}
let record = &self.storages[id.index()];
#[cfg(feature = "std")]
let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
operation: "get_typed_record_by_id".to_string(),
reason: "type mismatch during downcast".to_string(),
})?;
#[cfg(not(feature = "std"))]
let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
_operation: (),
_reason: (),
})?;
Ok(typed_record)
}
#[cfg(feature = "std")]
pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
self.storages
.iter()
.enumerate()
.map(|(i, record)| {
let id = RecordId::new(i as u32);
let type_id = self.types[i];
let key = self.keys[i];
record.collect_metadata(type_id, key, id)
})
.collect()
}
#[cfg(feature = "std")]
pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
let id = self.resolve_str(record_key)?;
self.storages.get(id.index())?.latest_json()
}
#[cfg(feature = "std")]
pub fn set_record_from_json(
&self,
record_key: &str,
json_value: serde_json::Value,
) -> DbResult<()> {
let id = self
.resolve_str(record_key)
.ok_or_else(|| DbError::RecordKeyNotFound {
key: record_key.to_string(),
})?;
self.storages[id.index()].set_from_json(json_value)
}
}
pub struct AimDbBuilder<R = NoRuntime> {
records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
runtime: Option<Arc<R>>,
connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
start_fns: Vec<Box<dyn core::any::Any + Send>>,
extensions: Extensions,
#[cfg(feature = "std")]
remote_config: Option<crate::remote::AimxConfig>,
_phantom: PhantomData<R>,
}
impl AimDbBuilder<NoRuntime> {
pub fn new() -> Self {
Self {
records: Vec::new(),
runtime: None,
connector_builders: Vec::new(),
spawn_fns: Vec::new(),
start_fns: Vec::new(),
extensions: Extensions::new(),
#[cfg(feature = "std")]
remote_config: None,
_phantom: PhantomData,
}
}
pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
where
R: aimdb_executor::Spawn + 'static,
{
AimDbBuilder {
records: self.records,
runtime: Some(rt),
connector_builders: Vec::new(),
spawn_fns: Vec::new(),
start_fns: self.start_fns,
extensions: self.extensions,
#[cfg(feature = "std")]
remote_config: self.remote_config,
_phantom: PhantomData,
}
}
}
impl<R> AimDbBuilder<R>
where
R: aimdb_executor::Spawn + 'static,
{
pub fn extensions(&self) -> &Extensions {
&self.extensions
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.extensions
}
pub fn on_start<F, Fut>(&mut self, f: F) -> &mut Self
where
F: FnOnce(Arc<R>) -> Fut + Send + 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
#[cfg(feature = "std")]
let boxed: StartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
#[cfg(not(feature = "std"))]
let boxed: NoStdStartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
self.start_fns.push(Box::new(boxed));
self
}
pub fn with_connector(
mut self,
builder: impl crate::connector::ConnectorBuilder<R> + 'static,
) -> Self {
self.connector_builders.push(Box::new(builder));
self
}
#[cfg(feature = "std")]
pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
self.remote_config = Some(config);
self
}
pub fn configure<T>(
&mut self,
key: impl RecordKey,
f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
) -> &mut Self
where
T: Send + Sync + 'static + Debug + Clone,
{
let record_key: StringKey = StringKey::from_dynamic(key.as_str());
let type_id = TypeId::of::<T>();
let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
let (rec, is_new_record) = match record_index {
Some(idx) => {
let (_, existing_type, record) = &mut self.records[idx];
assert!(
*existing_type == type_id,
"StringKey '{}' already registered with different type",
record_key.as_str()
);
(
record
.as_typed_mut::<T, R>()
.expect("type mismatch in record registry"),
false,
)
}
None => {
self.records
.push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
let (_, _, record) = self.records.last_mut().unwrap();
(
record
.as_typed_mut::<T, R>()
.expect("type mismatch in record registry"),
true,
)
}
};
let mut reg = RecordRegistrar {
rec,
connector_builders: &self.connector_builders,
#[cfg(feature = "alloc")]
record_key: record_key.as_str().to_string(),
extensions: &self.extensions,
};
f(&mut reg);
if is_new_record {
let spawn_key = record_key;
#[allow(clippy::type_complexity)]
let spawn_fn: Box<
dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
> = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
use crate::typed_record::RecordSpawner;
let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
#[cfg(feature = "alloc")]
let key = db
.inner()
.key_for(id)
.map(|k| k.as_str().to_string())
.unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
#[cfg(not(feature = "alloc"))]
let key = "";
#[cfg(feature = "alloc")]
let key = key.as_str();
RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
});
self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
}
self
}
pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
where
T: RecordT<R>,
{
let key = StringKey::new(core::any::type_name::<T>());
self.configure::<T>(key, |reg| T::register(reg, cfg))
}
pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
where
T: RecordT<R>,
{
self.configure::<T>(key, |reg| T::register(reg, cfg))
}
pub async fn run(self) -> DbResult<()> {
#[cfg(feature = "tracing")]
tracing::info!("Building database and spawning background tasks...");
let _db = self.build().await?;
#[cfg(feature = "tracing")]
tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
core::future::pending::<()>().await;
Ok(())
}
#[cfg_attr(not(feature = "std"), allow(unused_mut))]
pub async fn build(self) -> DbResult<AimDb<R>> {
use crate::DbError;
for (key, _, record) in &self.records {
record.validate().map_err(|_msg| {
let _ = &key;
#[cfg(feature = "std")]
{
DbError::RuntimeError {
message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
}
}
#[cfg(not(feature = "std"))]
{
DbError::RuntimeError { _message: () }
}
})?;
}
let runtime = self.runtime.ok_or({
#[cfg(feature = "std")]
{
DbError::RuntimeError {
message: "runtime not set (use .runtime())".into(),
}
}
#[cfg(not(feature = "std"))]
{
DbError::RuntimeError { _message: () }
}
})?;
let record_count = self.records.len();
let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
let id = RecordId::new(i as u32);
if by_key.contains_key(&key) {
#[cfg(feature = "std")]
return Err(DbError::DuplicateRecordKey {
key: key.as_str().to_string(),
});
#[cfg(not(feature = "std"))]
return Err(DbError::DuplicateRecordKey { _key: () });
}
storages.push(record);
by_key.insert(key, id);
by_type.entry(type_id).or_default().push(id);
types.push(type_id);
keys.push(key);
}
let record_infos: Vec<crate::graph::RecordGraphInfo> = storages
.iter()
.enumerate()
.map(|(idx, record)| {
let key = keys[idx].as_str().to_string();
let origin = record.record_origin();
let (buffer_type, buffer_capacity) = record.buffer_info();
crate::graph::RecordGraphInfo {
key,
origin,
buffer_type,
buffer_capacity,
tap_count: record.consumer_count(),
has_outbound_link: record.outbound_connector_count() > 0,
}
})
.collect();
let dependency_graph = DependencyGraph::build_and_validate(&record_infos)?;
#[cfg(feature = "tracing")]
tracing::debug!(
"Dependency graph built successfully ({} nodes, {} edges, topo order: {:?})",
dependency_graph.nodes.len(),
dependency_graph.edges.len(),
dependency_graph.topo_order
);
let inner = Arc::new(AimDbInner {
storages,
by_key,
by_type,
types,
keys,
dependency_graph,
extensions: self.extensions,
});
let db = Arc::new(AimDb {
inner: inner.clone(),
runtime: runtime.clone(),
});
#[cfg(feature = "tracing")]
tracing::info!(
"Spawning producer services and tap observers for {} records",
self.spawn_fns.len()
);
let mut spawn_fn_map: HashMap<StringKey, Box<dyn core::any::Any + Send>> =
self.spawn_fns.into_iter().collect();
for key_str in inner.dependency_graph.topo_order() {
let key = match inner.by_key.keys().find(|k| k.as_str() == key_str) {
Some(k) => *k,
None => continue, };
let spawn_fn_any = match spawn_fn_map.remove(&key) {
Some(f) => f,
None => continue, };
let id = inner.resolve(&key).ok_or({
#[cfg(feature = "std")]
{
DbError::RecordKeyNotFound {
key: key.as_str().to_string(),
}
}
#[cfg(not(feature = "std"))]
{
DbError::RecordKeyNotFound { _key: () }
}
})?;
type SpawnFnType<R> =
Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
let spawn_fn = spawn_fn_any
.downcast::<SpawnFnType<R>>()
.expect("spawn function type mismatch");
(*spawn_fn)(&runtime, &db, id)?;
}
#[cfg(feature = "tracing")]
tracing::info!("Automatic spawning complete");
#[cfg(feature = "std")]
if let Some(remote_cfg) = self.remote_config {
#[cfg(feature = "tracing")]
tracing::info!(
"Spawning remote access supervisor on socket: {}",
remote_cfg.socket_path.display()
);
let writable_keys = remote_cfg.security_policy.writable_records();
for key_str in writable_keys {
if let Some(id) = inner.resolve_str(&key_str) {
#[cfg(feature = "tracing")]
tracing::debug!("Marking record '{}' as writable", key_str);
inner.storages[id.index()].set_writable_erased(true);
}
}
crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
#[cfg(feature = "tracing")]
tracing::info!("Remote access supervisor spawned successfully");
}
for builder in self.connector_builders {
#[cfg(feature = "tracing")]
let scheme = {
#[cfg(feature = "std")]
{
builder.scheme().to_string()
}
#[cfg(not(feature = "std"))]
{
alloc::string::String::from(builder.scheme())
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Building connector for scheme: {}", scheme);
let _connector = builder.build(&db).await?;
#[cfg(feature = "tracing")]
tracing::info!("Connector built and spawned successfully: {}", scheme);
}
if !self.start_fns.is_empty() {
#[cfg(feature = "tracing")]
tracing::debug!("Spawning {} on_start task(s)", self.start_fns.len());
#[cfg(feature = "std")]
for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
let start_fn = start_fn_any
.downcast::<StartFnType<R>>()
.unwrap_or_else(|_| {
panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
});
let future = (*start_fn)(runtime.clone());
runtime.spawn(future).map_err(DbError::from)?;
}
#[cfg(not(feature = "std"))]
for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
let start_fn = start_fn_any
.downcast::<NoStdStartFnType<R>>()
.unwrap_or_else(|_| {
panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
});
let future = (*start_fn)(runtime.clone());
runtime.spawn(future).map_err(DbError::from)?;
}
}
let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
Ok(db_owned)
}
}
impl Default for AimDbBuilder<NoRuntime> {
fn default() -> Self {
Self::new()
}
}
pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
inner: Arc<AimDbInner>,
runtime: Arc<R>,
}
impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
runtime: self.runtime.clone(),
}
}
}
impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
#[doc(hidden)]
pub fn inner(&self) -> &Arc<AimDbInner> {
&self.inner
}
pub fn extensions(&self) -> &Extensions {
&self.inner.extensions
}
pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
let mut b = AimDbBuilder::new().runtime(rt);
f(&mut b);
b.run().await
}
pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
where
F: core::future::Future<Output = ()> + Send + 'static,
{
self.runtime.spawn(future).map_err(DbError::from)?;
Ok(())
}
pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
where
T: Send + 'static + Debug + Clone,
{
let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
typed_rec.produce(value).await;
Ok(())
}
pub fn subscribe<T>(
&self,
key: impl AsRef<str>,
) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
where
T: Send + Sync + 'static + Debug + Clone,
{
let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
typed_rec.subscribe()
}
pub fn producer<T>(
&self,
key: impl Into<alloc::string::String>,
) -> crate::typed_api::Producer<T, R>
where
T: Send + 'static + Debug + Clone,
{
crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
}
pub fn consumer<T>(
&self,
key: impl Into<alloc::string::String>,
) -> crate::typed_api::Consumer<T, R>
where
T: Send + Sync + 'static + Debug + Clone,
{
crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
}
pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
self.inner.resolve_str(key)
}
pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
self.inner.records_of_type::<T>()
}
pub fn runtime(&self) -> &R {
&self.runtime
}
#[cfg(feature = "std")]
pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
self.inner.list_records()
}
#[cfg(feature = "std")]
pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
self.inner.try_latest_as_json(record_name)
}
#[cfg(feature = "std")]
pub fn set_record_from_json(
&self,
record_name: &str,
json_value: serde_json::Value,
) -> DbResult<()> {
self.inner.set_record_from_json(record_name, json_value)
}
#[cfg(feature = "std")]
#[allow(unused_variables)] pub fn subscribe_record_updates(
&self,
record_key: &str,
queue_size: usize,
) -> DbResult<(
tokio::sync::mpsc::Receiver<serde_json::Value>,
tokio::sync::oneshot::Sender<()>,
)> {
use tokio::sync::{mpsc, oneshot};
let id = self
.inner
.resolve_str(record_key)
.ok_or_else(|| DbError::RecordKeyNotFound {
key: record_key.to_string(),
})?;
let record = self
.inner
.storage(id)
.ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
let mut json_reader = record.subscribe_json()?;
let (value_tx, value_rx) = mpsc::channel(queue_size);
let (cancel_tx, mut cancel_rx) = oneshot::channel();
let type_id = self.inner.types[id.index()];
let key = self.inner.keys[id.index()];
let record_metadata = record.collect_metadata(type_id, key, id);
let runtime = self.runtime.clone();
let spawn_result = runtime.spawn(async move {
#[cfg(feature = "tracing")]
tracing::debug!(
"Subscription consumer task started for {}",
record_metadata.name
);
loop {
tokio::select! {
_ = &mut cancel_rx => {
#[cfg(feature = "tracing")]
tracing::debug!("Subscription cancelled");
break;
}
result = json_reader.recv_json() => {
match result {
Ok(json_val) => {
if value_tx.send(json_val).await.is_err() {
#[cfg(feature = "tracing")]
tracing::debug!("Subscription receiver dropped");
break;
}
}
Err(DbError::BufferLagged { lag_count, .. }) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Subscription for {} lagged by {} messages",
record_metadata.name,
lag_count
);
}
Err(DbError::BufferClosed { .. }) => {
#[cfg(feature = "tracing")]
tracing::debug!("Buffer closed for {}", record_metadata.name);
break;
}
Err(e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"Subscription error for {}: {:?}",
record_metadata.name,
e
);
break;
}
}
}
}
}
#[cfg(feature = "tracing")]
tracing::debug!("Subscription consumer task terminated");
});
spawn_result.map_err(DbError::from)?;
Ok((value_rx, cancel_tx))
}
#[cfg(feature = "alloc")]
pub fn collect_inbound_routes(
&self,
scheme: &str,
) -> Vec<(
String,
Box<dyn crate::connector::ProducerTrait>,
crate::connector::DeserializerFn,
)> {
let mut routes = Vec::new();
let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
for record in &self.inner.storages {
let inbound_links = record.inbound_connectors();
for link in inbound_links {
if link.url.scheme() != scheme {
continue;
}
let topic = link.resolve_topic();
if let Some(producer) = link.create_producer(db_any.clone()) {
routes.push((topic, producer, link.deserializer.clone()));
}
}
}
#[cfg(feature = "tracing")]
if !routes.is_empty() {
tracing::debug!(
"Collected {} inbound routes for scheme '{}'",
routes.len(),
scheme
);
}
routes
}
#[cfg(feature = "alloc")]
pub fn collect_outbound_topic_type_ids(&self, scheme: &str) -> Vec<(String, TypeId)> {
let mut result = Vec::new();
for (idx, record) in self.inner.storages.iter().enumerate() {
let type_id = self.inner.types[idx];
for link in record.outbound_connectors() {
if link.url.scheme() != scheme {
continue;
}
result.push((link.url.resource_id(), type_id));
}
}
result
}
#[cfg(feature = "alloc")]
pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
let mut routes = Vec::new();
let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
for record in &self.inner.storages {
let outbound_links = record.outbound_connectors();
for link in outbound_links {
if link.url.scheme() != scheme {
continue;
}
let destination = link.url.resource_id();
let Some(serializer) = link.serializer.clone() else {
#[cfg(feature = "tracing")]
tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
continue;
};
if let Some(consumer) = link.create_consumer(db_any.clone()) {
routes.push((
destination,
consumer,
serializer,
link.config.clone(),
link.topic_provider.clone(),
));
}
}
}
#[cfg(feature = "tracing")]
if !routes.is_empty() {
tracing::debug!(
"Collected {} outbound routes for scheme '{}'",
routes.len(),
scheme
);
}
routes
}
}