use iceoryx2::port::subscriber::Subscriber;
use iceoryx2::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::{AppId, Event, IpcError, IpcPayload, IpcResult};
#[derive(Debug)]
pub struct EventSubscriber {
subscriber: Subscriber<ipc::Service, IpcPayload, ()>,
app_id: AppId,
is_active: Arc<AtomicBool>,
last_heartbeat: Instant,
}
impl EventSubscriber {
pub fn new(source_app: AppId, subscriber_app: AppId) -> IpcResult<Self> {
let service_name = "e_midi_events".to_string();
println!("[IPC SUBSCRIBER DEBUG] Using service name: {} (source_app: {:?}, subscriber_app: {:?})", service_name, source_app, subscriber_app);
let node = NodeBuilder::new().create::<ipc::Service>().map_err(|e| {
eprintln!("[IPC SUBSCRIBER ERROR] Node creation failed: {:?}", e);
IpcError::NodeCreation("Node creation failed".to_string())
})?;
let service = match node
.service_builder(&service_name.as_str().try_into().map_err(|e| {
eprintln!("[IPC SUBSCRIBER ERROR] Invalid service name: {:?}", e);
IpcError::ServiceCreation("Invalid service name".to_string())
})?)
.publish_subscribe::<IpcPayload>()
.open()
{
Ok(service) => service,
Err(e) => {
eprintln!(
"[IPC SUBSCRIBER ERROR] Failed to open service (does publisher exist?): {:?}",
e
);
return Err(IpcError::ServiceCreation(format!(
"Failed to open service: {:?}",
e
)));
}
};
let subscriber = service.subscriber_builder().create().map_err(|e| {
eprintln!(
"[IPC SUBSCRIBER ERROR] Failed to create subscriber: {:?}",
e
);
IpcError::SubscriberCreation("Failed to create subscriber".to_string())
})?;
Ok(Self {
subscriber,
app_id: subscriber_app,
is_active: Arc::new(AtomicBool::new(true)),
last_heartbeat: Instant::now(),
})
}
pub fn try_receive(&mut self) -> IpcResult<Vec<Event>> {
if !self.is_active.load(Ordering::Relaxed) {
return Ok(Vec::new());
}
let mut events = Vec::new(); let mut sample_count = 0;
while let Some(sample) = self
.subscriber
.receive()
.map_err(|_| IpcError::ReceiveError("Failed to receive".to_string()))?
{
sample_count += 1;
println!("[IPC SUBSCRIBER DEBUG] Received sample #{}", sample_count);
let json_data = sample.payload();
if let Ok(event) = serde_json::from_slice::<Event>(json_data) {
events.push(event);
} else if let Ok(batch) = serde_json::from_slice::<Vec<Event>>(json_data) {
events.extend(batch);
} else {
eprintln!(
"[IPC SUBSCRIBER ERROR] Failed to deserialize event data: {:?}",
json_data
);
return Err(IpcError::DeserializationError(
"Failed to deserialize event data".to_string(),
));
}
}
if !events.is_empty() {
self.last_heartbeat = Instant::now();
}
Ok(events)
}
pub fn receive_timeout(&mut self, timeout: Duration) -> IpcResult<Vec<Event>> {
let start = Instant::now();
while start.elapsed() < timeout {
let events = self.try_receive()?;
if !events.is_empty() {
return Ok(events);
}
std::thread::sleep(Duration::from_millis(1));
}
Ok(Vec::new())
}
pub fn is_active(&self) -> bool {
self.is_active.load(Ordering::Relaxed)
}
pub fn deactivate(&self) {
self.is_active.store(false, Ordering::Relaxed);
}
pub fn time_since_last_event(&self) -> Duration {
self.last_heartbeat.elapsed()
}
pub fn source_is_alive(&self, timeout: Duration) -> bool {
self.time_since_last_event() < timeout
}
pub fn app_id(&self) -> AppId {
self.app_id
}
}
impl Drop for EventSubscriber {
fn drop(&mut self) {
self.deactivate();
}
}
unsafe impl Send for EventSubscriber {}
unsafe impl Sync for EventSubscriber {}
pub struct EventFilter {
midi_events: bool,
window_events: bool,
grid_events: bool,
system_events: bool,
}
impl EventFilter {
pub fn new() -> Self {
Self {
midi_events: true,
window_events: true,
grid_events: true,
system_events: true,
}
}
pub fn midi_only() -> Self {
Self {
midi_events: true,
window_events: false,
grid_events: false,
system_events: false,
}
}
pub fn system_only() -> Self {
Self {
midi_events: false,
window_events: false,
grid_events: false,
system_events: true,
}
}
pub fn filter(&self, events: Vec<Event>) -> Vec<Event> {
events
.into_iter()
.filter(|event| {
match event {
Event::MidiCommandPlay { .. }
| Event::MidiCommandStop { .. }
| Event::MidiCommandPause { .. }
| Event::MidiCommandResume { .. }
| Event::MidiCommandNext { .. }
| Event::MidiCommandPrevious { .. }
| Event::MidiCommandSetTempo { .. }
| Event::MidiCommandSongListRequest { .. } => self.midi_events,
Event::MidiPlaybackStarted { .. }
| Event::MidiPlaybackStopped { .. }
| Event::MidiPlaybackPaused { .. }
| Event::MidiPlaybackResumed { .. }
| Event::MidiTempoChanged { .. }
| Event::MidiSongChanged { .. }
| Event::MidiProgressUpdate { .. }
| Event::MidiSongListUpdated { .. } => self.midi_events,
Event::WindowFocused { .. }
| Event::WindowClosed { .. }
| Event::WindowResized { .. } => self.window_events,
Event::GridCellSelected { .. }
| Event::GridCellUpdated { .. }
| Event::GridStateChanged { .. } => self.grid_events,
Event::SystemShutdown { .. }
| Event::SystemHeartbeat { .. }
| Event::StateRequest { .. }
| Event::StateResponse { .. } => self.system_events,
Event::MidiNoteOn { .. }
| Event::MidiNoteOff { .. }
| Event::MidiProgramChange { .. } => self.midi_events,
}
})
.collect()
}
}
impl Default for EventFilter {
fn default() -> Self {
Self::new()
}
}