use std::{path::PathBuf, sync::Arc};
use anyhow::{anyhow, bail};
use bpfman::types::{
AttachInfo, FentryProgram, FexitProgram, KprobeProgram, ListFilter, Location, Program,
ProgramData, TcProceedOn, TcProgram, TcxProgram, TracepointProgram, UprobeProgram,
XdpProceedOn, XdpProgram,
};
use bpfman_api::v1::{
AttachRequest, AttachResponse, BpfmanProgramType, DetachRequest, DetachResponse, GetRequest,
GetResponse, ListRequest, ListResponse, LoadRequest, LoadResponse, LoadResponseInfo,
ProgSpecificInfo, PullBytecodeRequest, PullBytecodeResponse, UnloadRequest, UnloadResponse,
attach_info::Info, bpfman_server::Bpfman, bytecode_location::Location as RpcLocation,
list_response::ListResult,
};
use log::error;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};
use crate::AsyncBpfman;
pub struct BpfmanLoader {
lock: Arc<Mutex<AsyncBpfman>>,
}
impl BpfmanLoader {
pub(crate) fn new(lock: Arc<Mutex<AsyncBpfman>>) -> BpfmanLoader {
BpfmanLoader { lock }
}
}
impl BpfmanLoader {
async fn do_load(&self, request: Request<LoadRequest>) -> anyhow::Result<LoadResponse> {
let request = request.into_inner();
let bytecode_source = match request
.bytecode
.ok_or(anyhow!("missing bytecode info"))?
.location
.ok_or(anyhow!("missing location"))?
{
RpcLocation::Image(i) => Location::Image(i.into()),
RpcLocation::File(p) => Location::File(p),
};
let programs : Vec<Result<Program, anyhow::Error>> = request.info.iter().map(|info| {
let data = ProgramData::new(
bytecode_source.clone(),
info.name.clone(),
request.metadata.clone(),
request.global_data.clone(),
request.map_owner_id,
)?;
let program = match info.program_type() {
BpfmanProgramType::Xdp => Program::Xdp(XdpProgram::new(data)?),
BpfmanProgramType::Tc => Program::Tc(TcProgram::new(data)?),
BpfmanProgramType::Tcx => Program::Tcx(TcxProgram::new(data)?),
BpfmanProgramType::Tracepoint => Program::Tracepoint(TracepointProgram::new(data)?),
BpfmanProgramType::Kprobe => Program::Kprobe(KprobeProgram::new(data)?),
BpfmanProgramType::Uprobe => Program::Uprobe(UprobeProgram::new(data)?),
BpfmanProgramType::Fentry => {
if let Some(ProgSpecificInfo {
info: Some(bpfman_api::v1::prog_specific_info::Info::FentryLoadInfo(fentry)),
}) = &info.info
{
Program::Fentry(FentryProgram::new(data, fentry.fn_name.clone())?)
} else {
bail!("missing FentryInfo");
}
}
BpfmanProgramType::Fexit => {
if let Some(ProgSpecificInfo {
info: Some(bpfman_api::v1::prog_specific_info::Info::FexitLoadInfo(fexit)),
}) = &info.info
{
Program::Fexit(FexitProgram::new(data, fexit.fn_name.clone())?)
} else {
bail!("missing FexitInfo");
}
}
};
Ok(program)
}).collect();
for p in programs.iter() {
if let Err(e) = p {
bail!("{e}");
}
}
let bpfman_lock = self.lock.lock().await;
let add_prog_result = bpfman_lock
.add_programs(programs.into_iter().map(|p| p.unwrap()).collect())
.await?;
let mut load_program_info = vec![];
for p in add_prog_result.iter() {
let info = p.try_into()?;
let kernel_info = p.try_into()?;
load_program_info.push(LoadResponseInfo {
info: Some(info),
kernel_info: Some(kernel_info),
});
}
let reply_entry = LoadResponse {
programs: load_program_info,
};
Ok(reply_entry)
}
async fn do_unload(&self, request: Request<UnloadRequest>) -> anyhow::Result<UnloadResponse> {
let reply = UnloadResponse {};
let request = request.into_inner();
let bpfman_lock = self.lock.lock().await;
bpfman_lock.remove_program(request.id).await?;
Ok(reply)
}
async fn do_get(&self, request: Request<GetRequest>) -> anyhow::Result<GetResponse> {
let request = request.into_inner();
let id = request.id;
let bpfman_lock = self.lock.lock().await;
let program = bpfman_lock.get_program(id).await?;
let reply_entry = GetResponse {
info: if let Program::Unsupported(_) = program {
None
} else {
Some((&program).try_into()?)
},
kernel_info: Some((&program).try_into()?),
};
Ok(reply_entry)
}
async fn do_list(&self, request: Request<ListRequest>) -> anyhow::Result<ListResponse> {
let mut reply = ListResponse { results: vec![] };
let filter = ListFilter::new(
request.get_ref().program_type,
request.get_ref().match_metadata.clone(),
request.get_ref().bpfman_programs_only(),
);
let bpfman_lock = self.lock.lock().await;
for r in bpfman_lock
.list_programs(filter)
.await
.map_err(|e| Status::aborted(format!("failed to list programs: {e}")))?
{
let reply_entry = ListResult {
info: if let Program::Unsupported(_) = r {
None
} else {
Some((&r).try_into()?)
},
kernel_info: Some((&r).try_into()?),
};
reply.results.push(reply_entry)
}
Ok(reply)
}
async fn do_pull_bytecode(
&self,
request: tonic::Request<PullBytecodeRequest>,
) -> anyhow::Result<PullBytecodeResponse> {
let request = request.into_inner();
let image = match request.image {
Some(i) => i.into(),
None => bail!("Empty pull_bytecode request received"),
};
let bpfman_lock = self.lock.lock().await;
bpfman_lock.pull_bytecode(image).await?;
let reply = PullBytecodeResponse {};
Ok(reply)
}
async fn do_attach(
&self,
request: tonic::Request<AttachRequest>,
) -> anyhow::Result<AttachResponse> {
let request = request.into_inner();
let attach_info = if let Some(info) = request.attach {
match info.info {
Some(Info::XdpAttachInfo(i)) => AttachInfo::Xdp {
priority: i.priority,
iface: i.iface,
proceed_on: XdpProceedOn::from_int32s(i.proceed_on)
.map_err(|_| Status::aborted("failed to parse proceed_on"))?,
netns: i.netns.map(PathBuf::from),
metadata: i.metadata,
},
Some(Info::TcAttachInfo(i)) => AttachInfo::Tc {
priority: i.priority,
iface: i.iface,
direction: i.direction,
proceed_on: TcProceedOn::from_int32s(i.proceed_on)
.map_err(|_| Status::aborted("failed to parse proceed_on"))?,
netns: i.netns.map(PathBuf::from),
metadata: i.metadata,
},
Some(Info::TcxAttachInfo(i)) => AttachInfo::Tcx {
priority: i.priority,
iface: i.iface,
direction: i.direction,
netns: i.netns.map(PathBuf::from),
metadata: i.metadata,
},
Some(Info::TracepointAttachInfo(i)) => AttachInfo::Tracepoint {
tracepoint: i.tracepoint,
metadata: i.metadata,
},
Some(Info::KprobeAttachInfo(i)) => AttachInfo::Kprobe {
fn_name: i.fn_name,
offset: i.offset,
container_pid: i.container_pid,
metadata: i.metadata,
},
Some(Info::UprobeAttachInfo(i)) => AttachInfo::Uprobe {
fn_name: i.fn_name,
offset: i.offset,
target: i.target,
pid: i.pid,
container_pid: i.container_pid,
metadata: i.metadata,
},
Some(Info::FentryAttachInfo(i)) => AttachInfo::Fentry {
metadata: i.metadata,
},
Some(Info::FexitAttachInfo(i)) => AttachInfo::Fexit {
metadata: i.metadata,
},
None => bail!("missing attach_info"),
}
} else {
bail!("missing attach_info");
};
let bpfman_lock = self.lock.lock().await;
let link = bpfman_lock.attach(request.id, attach_info).await?;
let link_id = link.get_id()?;
Ok(AttachResponse { link_id })
}
async fn do_detach(&self, request: Request<DetachRequest>) -> anyhow::Result<DetachResponse> {
let request = request.into_inner();
let bpfman_lock = self.lock.lock().await;
bpfman_lock.detach(request.link_id).await?;
Ok(DetachResponse {})
}
}
#[tonic::async_trait]
impl Bpfman for BpfmanLoader {
async fn load(&self, request: Request<LoadRequest>) -> Result<Response<LoadResponse>, Status> {
self.do_load(request)
.await
.map_err(|e| {
error!("Error in load: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn unload(
&self,
request: Request<UnloadRequest>,
) -> Result<Response<UnloadResponse>, Status> {
self.do_unload(request)
.await
.map_err(|e| {
error!("Error in get: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
self.do_get(request)
.await
.map_err(|e| {
error!("Error in get: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn attach(
&self,
request: Request<AttachRequest>,
) -> Result<Response<AttachResponse>, Status> {
self.do_attach(request)
.await
.map_err(|e| {
error!("Error in attach: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn detach(
&self,
request: Request<DetachRequest>,
) -> Result<Response<DetachResponse>, Status> {
self.do_detach(request)
.await
.map_err(|e| {
error!("Error in detach: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn list(&self, request: Request<ListRequest>) -> Result<Response<ListResponse>, Status> {
self.do_list(request)
.await
.map_err(|e| {
error!("Error in list: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
async fn pull_bytecode(
&self,
request: tonic::Request<PullBytecodeRequest>,
) -> std::result::Result<tonic::Response<PullBytecodeResponse>, tonic::Status> {
self.do_pull_bytecode(request)
.await
.map_err(|e| {
error!("Error in pull: {e}");
Status::aborted(format!("{e}"))
})
.map(Response::new)
}
}