#![doc(hidden)]
use async_trait::async_trait;
use sof::{
framework::{
Plugin, PluginConfig, PluginContext, PluginHost, PluginSetupError, TransactionDispatchMode,
TransactionEvent,
},
runtime::{ObserverRuntime, RuntimeError, RuntimeSetup, ShredTrustMode},
};
#[cfg(all(target_os = "linux", feature = "kernel-bypass"))]
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
#[cfg(all(target_os = "linux", feature = "kernel-bypass"))]
#[path = "support/af_xdp_ingress.rs"]
mod af_xdp_ingress;
#[derive(Debug, Clone, Copy, Default)]
struct TrustedProviderInlinePlugin;
#[async_trait]
impl Plugin for TrustedProviderInlinePlugin {
fn name(&self) -> &'static str {
"trusted-provider-inline-plugin"
}
fn config(&self) -> PluginConfig {
PluginConfig::new().with_transaction_mode(TransactionDispatchMode::Inline)
}
async fn setup(&self, ctx: PluginContext) -> Result<(), PluginSetupError> {
tracing::info!(
plugin = ctx.plugin_name,
"trusted provider plugin startup completed"
);
Ok(())
}
async fn on_transaction(&self, event: &TransactionEvent) {
tracing::debug!(
slot = event.slot,
signature = ?event.signature,
"trusted provider transaction callback"
);
}
async fn shutdown(&self, ctx: PluginContext) {
tracing::info!(
plugin = ctx.plugin_name,
"trusted provider plugin shutdown completed"
);
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), RuntimeError> {
let host = PluginHost::builder()
.add_plugin(TrustedProviderInlinePlugin)
.build();
let setup = RuntimeSetup::new().with_shred_trust_mode(ShredTrustMode::TrustedRawShredProvider);
#[cfg(all(target_os = "linux", feature = "kernel-bypass"))]
if std::env::var_os("SOF_AF_XDP_IFACE").is_some() {
let config = af_xdp_ingress::read_af_xdp_config();
let stop = Arc::new(AtomicBool::new(false));
let producer_stop = Arc::clone(&stop);
let (tx, rx) = sof::runtime::create_kernel_bypass_ingress_queue();
tracing::info!(
interface = %config.interface,
queue_id = config.queue_id,
ring_depth = config.ring_depth,
batch_size = config.batch_size,
trust_mode = %ShredTrustMode::TrustedRawShredProvider.as_str(),
"starting trusted raw shred provider example"
);
let producer_task = tokio::task::spawn_blocking(move || {
af_xdp_ingress::run_af_xdp_producer_until(&tx, &config, &producer_stop)
});
let producer_wait_task = tokio::spawn(async move {
let result = producer_task.await.map_err(|error| {
RuntimeError::Runloop(format!("AF_XDP producer task join failed: {error}"))
})?;
result
.map_err(|error| RuntimeError::Runloop(format!("AF_XDP producer failed: {error}")))
});
let runtime_result = ObserverRuntime::new()
.with_plugin_host(host)
.with_setup(setup)
.with_kernel_bypass_ingress(rx)
.run_until_termination_signal()
.await;
stop.store(true, Ordering::Relaxed);
let producer_result = producer_wait_task.await.map_err(|error| {
RuntimeError::Runloop(format!("AF_XDP producer waiter join failed: {error}"))
})?;
producer_result?;
return runtime_result;
}
tracing::warn!(
trust_mode = %ShredTrustMode::TrustedRawShredProvider.as_str(),
"kernel-bypass producer not configured; falling back to normal SOF runtime bring-up"
);
ObserverRuntime::new()
.with_plugin_host(host)
.with_setup(setup)
.run_until_termination_signal()
.await
}