#![warn(missing_docs)]
#![warn(rustdoc::missing_crate_level_docs)]
#![doc(html_root_url = "https://docs.rs/autocore-std/3.3.0")]
use anyhow::{anyhow, Result};
use futures_util::{SinkExt, StreamExt};
use log::LevelFilter;
use mechutil::ipc::{CommandMessage, MessageType};
use raw_sync::events::{Event, EventInit, EventState};
use raw_sync::Timeout;
use shared_memory::ShmemConf;
use std::collections::HashMap;
use std::sync::atomic::{fence, Ordering, AtomicBool};
use std::sync::Arc;
use std::time::Duration;
use tokio_tungstenite::{connect_async, tungstenite::Message};
pub mod logger;
pub use log;
pub mod fb;
pub mod iface;
pub mod command_client;
pub use command_client::CommandClient;
pub mod ethercat;
pub mod motion;
pub mod shm;
pub mod diagnostics;
pub mod banner;
pub mod fixed_string;
pub use fixed_string::FixedString;
pub trait AutoCoreMemory {}
pub trait ChangeTracker {
fn get_changes(&self, prev: &Self) -> Vec<(&'static str, serde_json::Value)>;
fn unpack_bits(&mut self) {}
fn pack_bits(&mut self, _pre_tick: &Self) {}
}
pub struct TickContext<'a, M> {
pub gm: &'a mut M,
pub client: &'a mut CommandClient,
pub cycle: u64,
}
pub trait ControlProgram {
type Memory: Copy + ChangeTracker;
fn initialize(&mut self, _mem: &mut Self::Memory) {}
fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>);
}
#[derive(Debug, Clone)]
pub struct RunnerConfig {
pub server_host: String,
pub ws_port: u16,
pub module_name: String,
pub shm_name: String,
pub tick_signal_name: String,
pub busy_signal_name: Option<String>,
pub log_level: LevelFilter,
pub log_udp_port: u16,
}
pub const DEFAULT_WS_PORT: u16 = 11969;
impl Default for RunnerConfig {
fn default() -> Self {
Self {
server_host: "127.0.0.1".to_string(),
ws_port: DEFAULT_WS_PORT,
module_name: "control".to_string(),
shm_name: "autocore_cyclic".to_string(),
tick_signal_name: "tick".to_string(),
busy_signal_name: None,
log_level: LevelFilter::Info,
log_udp_port: logger::DEFAULT_LOG_UDP_PORT,
}
}
}
pub struct ControlRunner<P: ControlProgram> {
config: RunnerConfig,
program: P,
}
impl<P: ControlProgram> ControlRunner<P> {
pub fn new(program: P) -> Self {
Self {
config: RunnerConfig::default(),
program,
}
}
pub fn config(mut self, config: RunnerConfig) -> Self {
self.config = config;
self
}
pub fn run(mut self) -> Result<()> {
if let Err(e) = logger::init_udp_logger(
&self.config.server_host,
self.config.log_udp_port,
self.config.log_level,
"control",
) {
eprintln!("Warning: Failed to initialize UDP logger: {}", e);
}
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()?;
rt.block_on(async {
log::info!("AutoCore Control Runner Starting...");
let ws_url = format!("ws://{}:{}/ws/", self.config.server_host, self.config.ws_port);
log::info!("Connecting to server at {}", ws_url);
let (ws_stream, _) = connect_async(&ws_url).await
.map_err(|e| anyhow!("Failed to connect to server at {}: {}", ws_url, e))?;
let (mut write, mut read) = ws_stream.split();
let request = CommandMessage::request("gm.get_layout", serde_json::Value::Null);
let transaction_id = request.transaction_id;
let request_json = serde_json::to_string(&request)?;
write.send(Message::Text(request_json)).await
.map_err(|e| anyhow!("Failed to send layout request: {}", e))?;
let timeout = Duration::from_secs(10);
let start = std::time::Instant::now();
let mut layout: Option<HashMap<String, serde_json::Value>> = None;
while start.elapsed() < timeout {
match tokio::time::timeout(Duration::from_secs(1), read.next()).await {
Ok(Some(Ok(Message::Text(text)))) => {
if let Ok(response) = serde_json::from_str::<CommandMessage>(&text) {
if response.transaction_id == transaction_id {
if !response.success {
return Err(anyhow!("Server error: {}", response.error_message));
}
layout = Some(serde_json::from_value(response.data)?);
break;
}
if response.message_type == MessageType::Broadcast {
continue;
}
}
}
Ok(Some(Ok(_))) => continue,
Ok(Some(Err(e))) => return Err(anyhow!("WebSocket error: {}", e)),
Ok(None) => return Err(anyhow!("Server closed connection")),
Err(_) => continue, }
}
let layout = layout.ok_or_else(|| anyhow!("Timeout waiting for layout response"))?;
log::info!("Layout received with {} entries.", layout.len());
let (ws_write_tx, mut ws_write_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (response_tx, response_rx) = tokio::sync::mpsc::unbounded_channel::<CommandMessage>();
tokio::spawn(async move {
while let Some(msg_json) = ws_write_rx.recv().await {
if let Err(e) = write.send(Message::Text(msg_json)).await {
log::error!("WebSocket write error: {}", e);
break;
}
}
});
tokio::spawn(async move {
while let Some(result) = read.next().await {
match result {
Ok(Message::Text(text)) => {
if let Ok(msg) = serde_json::from_str::<CommandMessage>(&text) {
if msg.message_type == MessageType::Response {
if response_tx.send(msg).is_err() {
break; }
}
}
}
Ok(Message::Close(_)) => {
log::info!("WebSocket closed by server");
break;
}
Err(e) => {
log::error!("WebSocket read error: {}", e);
break;
}
_ => {} }
}
});
let mut command_client = CommandClient::new(ws_write_tx.clone(), response_rx);
let tick_offset = self.find_offset(&layout, &self.config.tick_signal_name)?;
let busy_offset = if let Some(name) = &self.config.busy_signal_name {
Some(self.find_offset(&layout, name)?)
} else {
None
};
let shmem = ShmemConf::new().os_id(&self.config.shm_name).open()?;
let base_ptr = shmem.as_ptr();
log::info!("Shared Memory '{}' mapped.", self.config.shm_name);
let gm = unsafe { &mut *(base_ptr as *mut P::Memory) };
log::info!("Setting up tick event at offset {} (base_ptr: {:p})", tick_offset, base_ptr);
let (tick_event, _) = unsafe {
Event::from_existing(base_ptr.add(tick_offset))
}.map_err(|e| anyhow!("Failed to open tick event: {:?}", e))?;
log::info!("Tick event ready");
let busy_event = busy_offset.map(|offset| {
unsafe { Event::from_existing(base_ptr.add(offset)) }
.map(|(event, _)| event)
.ok()
}).flatten();
let mut local_mem: P::Memory = unsafe { std::ptr::read_volatile(gm) };
let mut prev_mem: P::Memory = local_mem;
fence(Ordering::Acquire);
self.program.initialize(&mut local_mem);
fence(Ordering::Release);
unsafe { std::ptr::write_volatile(gm, local_mem) };
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
if let Err(e) = ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}) {
log::warn!("Failed to set signal handler: {}", e);
}
log::info!("Entering Control Loop - waiting for first tick...");
let mut cycle_count: u64 = 0;
let mut consecutive_timeouts: u32 = 0;
while running.load(Ordering::SeqCst) {
match tick_event.wait(Timeout::Val(Duration::from_secs(1))) {
Ok(_) => {
consecutive_timeouts = 0;
},
Err(e) => {
let err_str = format!("{:?}", e);
if err_str.contains("Timeout") {
consecutive_timeouts += 1;
if consecutive_timeouts == 10 {
log::error!(
"TICK STALL: {} consecutive timeouts! cycle={} pending={} responses={} fds={} rss_kb={}",
consecutive_timeouts,
cycle_count,
command_client.pending_count(),
command_client.response_count(),
diagnostics::count_open_fds(),
diagnostics::get_rss_kb(),
);
}
if consecutive_timeouts > 10 && consecutive_timeouts % 60 == 0 {
log::error!(
"TICK STALL continues: {} consecutive timeouts, cycle={}",
consecutive_timeouts,
cycle_count,
);
}
continue;
}
return Err(anyhow!("Tick wait failed: {:?}", e));
}
}
if !running.load(Ordering::SeqCst) {
log::info!("Shutdown signal received, exiting control loop.");
break;
}
cycle_count += 1;
if cycle_count == 1 {
log::info!("First tick received!");
}
local_mem = unsafe { std::ptr::read_volatile(gm) };
prev_mem = local_mem;
fence(Ordering::Acquire);
local_mem.unpack_bits();
let pre_tick = local_mem;
command_client.poll();
let mut ctx = TickContext {
gm: &mut local_mem,
client: &mut command_client,
cycle: cycle_count,
};
self.program.process_tick(&mut ctx);
local_mem.pack_bits(&pre_tick);
fence(Ordering::Release);
unsafe { std::ptr::write_volatile(gm, local_mem) };
let changes = local_mem.get_changes(&prev_mem);
if !changes.is_empty() {
let mut data_map = serde_json::Map::new();
for (key, val) in changes {
data_map.insert(key.to_string(), val);
}
let msg = CommandMessage::request("gm.write", serde_json::Value::Object(data_map));
let msg_json = serde_json::to_string(&msg).unwrap_or_default();
if let Err(e) = ws_write_tx.send(msg_json) {
log::error!("Failed to send updates: {}", e);
}
}
if let Some(ref busy_ev) = busy_event {
let _ = busy_ev.set(EventState::Signaled);
}
}
Ok(())
})
}
fn find_offset(&self, layout: &HashMap<String, serde_json::Value>, name: &str) -> Result<usize> {
let info = layout.get(name).ok_or_else(|| anyhow!("Signal '{}' not found in layout", name))?;
info.get("offset")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.ok_or_else(|| anyhow!("Invalid offset for '{}'", name))
}
}
#[macro_export]
macro_rules! autocore_main {
($prog_type:ty, $shm_name:expr, $tick_signal:expr) => {
fn main() -> anyhow::Result<()> {
let config = autocore_std::RunnerConfig {
server_host: "127.0.0.1".to_string(),
ws_port: autocore_std::DEFAULT_WS_PORT,
module_name: "control".to_string(),
shm_name: $shm_name.to_string(),
tick_signal_name: $tick_signal.to_string(),
busy_signal_name: None,
log_level: log::LevelFilter::Info,
log_udp_port: autocore_std::logger::DEFAULT_LOG_UDP_PORT,
};
autocore_std::ControlRunner::new(<$prog_type>::new())
.config(config)
.run()
}
};
}