#![cfg_attr(target_os = "linux", allow(unsafe_code))]
#[cfg(not(target_os = "linux"))]
fn main() {
eprintln!(
"cellos-telemetry: non-Linux target — no-op (Phase F3a; \
see ADR-0006 / Plans/cellos-code-complete-roadmap.md)"
);
}
#[cfg(target_os = "linux")]
fn main() {
use std::process;
let pid = unsafe { libc::fork() };
if pid < 0 {
let errno = unsafe { *libc::__errno_location() };
eprintln!("cellos-telemetry: fork failed errno={errno}");
process::exit(2);
}
if pid > 0 {
process::exit(0);
}
if let Err(e) = agent_main() {
eprintln!("cellos-telemetry: agent_main failed: {e}");
process::exit(3);
}
}
#[cfg(target_os = "linux")]
fn agent_main() -> Result<(), String> {
use std::io::Write;
use std::time::Duration;
use cellos_telemetry::probes::{
capability::CapabilityProbe, inotify::InotifyProbe, net_connect::NetConnectProbe,
process::ProcWalker,
};
use cellos_telemetry::{
encode_frame, probe_source, ProbeEvent, MAX_FRAME_BODY_BYTES, VMADDR_CID_HOST,
VSOCK_TELEMETRY_PORT,
};
const QUEUE_CAP: usize = 1024;
const POLL_PERIOD: Duration = Duration::from_millis(100);
let mut vsock = open_vsock(VMADDR_CID_HOST, VSOCK_TELEMETRY_PORT)?;
let mut proc_walker = ProcWalker::new();
let mut cap_probe = CapabilityProbe::new();
let mut net_probe = NetConnectProbe::new();
let mut inotify_probe: Option<InotifyProbe> = None;
if let Ok(path) = std::env::var("CELLOS_TELEMETRY_INOTIFY_PATH") {
match InotifyProbe::open(&path, "cellos-telemetry") {
Ok(p) => inotify_probe = Some(p),
Err(e) => eprintln!("cellos-telemetry: inotify open({path}) failed: {e}"),
}
}
let _ = proc_walker.poll();
let mut dropped: u64 = 0;
let mut queue: std::collections::VecDeque<ProbeEvent> = std::collections::VecDeque::new();
loop {
for ev in proc_walker.poll() {
push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
}
for ev in cap_probe.poll() {
push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
}
for ev in net_probe.poll() {
push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
}
if let Some(p) = inotify_probe.as_mut() {
for ev in p.poll() {
push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
}
}
while let Some(ev) = queue.pop_front() {
let frame = match encode_frame(&ev) {
Ok(f) => f,
Err(_) => {
dropped = dropped.saturating_add(1);
continue;
}
};
if vsock.write_all(&frame).is_err() {
dropped = dropped.saturating_add(1);
queue.clear();
vsock = match open_vsock(VMADDR_CID_HOST, VSOCK_TELEMETRY_PORT) {
Ok(s) => s,
Err(_) => {
std::thread::sleep(POLL_PERIOD);
break;
}
};
}
}
let _ = MAX_FRAME_BODY_BYTES;
let _ = probe_source::PROCESS_EXITED;
std::thread::sleep(POLL_PERIOD);
}
}
#[cfg(target_os = "linux")]
fn push_or_drop(
queue: &mut std::collections::VecDeque<cellos_telemetry::ProbeEvent>,
ev: cellos_telemetry::ProbeEvent,
cap: usize,
dropped: &mut u64,
) {
if queue.len() >= cap {
*dropped = dropped.saturating_add(1);
return;
}
queue.push_back(ev);
}
#[cfg(target_os = "linux")]
fn open_vsock(cid: u32, port: u32) -> Result<std::fs::File, String> {
use std::os::unix::io::FromRawFd;
const AF_VSOCK: libc::c_int = 40;
#[repr(C)]
struct SockaddrVm {
svm_family: u16,
svm_reserved1: u16,
svm_port: u32,
svm_cid: u32,
svm_zero: [u8; 4],
}
let fd = unsafe { libc::socket(AF_VSOCK, libc::SOCK_STREAM, 0) };
if fd < 0 {
return Err(format!("socket(AF_VSOCK): errno {}", unsafe {
*libc::__errno_location()
}));
}
let addr = SockaddrVm {
svm_family: AF_VSOCK as u16,
svm_reserved1: 0,
svm_port: port,
svm_cid: cid,
svm_zero: [0u8; 4],
};
let ret = unsafe {
libc::connect(
fd,
&addr as *const SockaddrVm as *const libc::sockaddr,
core::mem::size_of::<SockaddrVm>() as libc::socklen_t,
)
};
if ret < 0 {
let errno = unsafe { *libc::__errno_location() };
unsafe { libc::close(fd) };
return Err(format!(
"vsock connect(CID={cid}, port={port}): errno {errno}"
));
}
Ok(unsafe { std::fs::File::from_raw_fd(fd) })
}