use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use super::config::{ConnectionType, ParamMap, X714Config};
use super::parser::parse_line_to_events;
use super::transport::{SharedEventHandler, default_event_handler, dispatch_event};
use super::types::X714Event;
pub(crate) struct X714Shared {
pub is_connected: AtomicBool,
pub is_reading: AtomicBool,
pub serial_number: Mutex<Option<String>>,
pub writer: tokio::sync::Mutex<Option<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
pub ble_write_tx: tokio::sync::Mutex<Option<tokio::sync::mpsc::UnboundedSender<Vec<u8>>>>,
pub running: AtomicBool,
}
impl X714Shared {
pub fn new() -> Arc<Self> {
Arc::new(Self {
is_connected: AtomicBool::new(false),
is_reading: AtomicBool::new(false),
serial_number: Mutex::new(None),
writer: tokio::sync::Mutex::new(None),
ble_write_tx: tokio::sync::Mutex::new(None),
running: AtomicBool::new(true),
})
}
}
pub struct X714 {
pub config: X714Config,
pub on_event: SharedEventHandler,
pub(crate) shared: Arc<X714Shared>,
}
impl Clone for X714 {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
on_event: Arc::clone(&self.on_event),
shared: Arc::clone(&self.shared),
}
}
}
impl Default for X714 {
fn default() -> Self {
Self::new(X714Config::default())
}
}
impl X714 {
pub fn new(config: X714Config) -> Self {
Self {
config,
on_event: default_event_handler(),
shared: X714Shared::new(),
}
}
pub fn from_map(params: ParamMap) -> Result<Self, String> {
Ok(Self::new(X714Config::from_map(params)?))
}
pub fn with_event_handler(mut self, handler: SharedEventHandler) -> Self {
self.on_event = handler;
self
}
pub fn set_event_handler(&mut self, handler: SharedEventHandler) {
self.on_event = handler;
}
pub fn is_connected(&self) -> bool {
self.shared.is_connected.load(Ordering::Relaxed)
}
pub fn is_reading(&self) -> bool {
self.shared.is_reading.load(Ordering::Relaxed)
}
pub fn serial_number(&self) -> Option<String> {
self.shared.serial_number.lock().unwrap().clone()
}
pub fn to_map(&self) -> ParamMap {
self.config.to_map()
}
pub fn connect_instruction(&self) -> String {
match self.config.connection_type {
ConnectionType::Serial => format!(
"SERIAL {} @ {} (VID={:#06x}, PID={:#06x})",
self.config.serial.port,
self.config.serial.baudrate,
self.config.serial.vid,
self.config.serial.pid
),
ConnectionType::Tcp => format!("TCP {}:{}", self.config.tcp.ip, self.config.tcp.port),
ConnectionType::Ble => {
if let Some(address) = &self.config.ble.address {
format!("BLE {} ({})", self.config.ble.name, address)
} else {
format!("BLE {} (auto-discovery)", self.config.ble.name)
}
}
}
}
pub async fn connect(&self) {
self.shared.running.store(true, Ordering::Relaxed);
match self.config.connection_type {
ConnectionType::Serial => self.run_serial_loop().await,
ConnectionType::Tcp => self.run_tcp_loop().await,
ConnectionType::Ble => self.run_ble_loop().await,
}
}
pub async fn close(&self) {
self.shared.running.store(false, Ordering::Relaxed);
self.shared.is_connected.store(false, Ordering::Relaxed);
self.shared.is_reading.store(false, Ordering::Relaxed);
*self.shared.serial_number.lock().unwrap() = None;
*self.shared.writer.lock().await = None;
*self.shared.ble_write_tx.lock().await = None;
dispatch_event(
&self.on_event,
&self.config.name,
&X714Event::Connection(false),
);
}
pub async fn write(&self, command: &str) -> Result<(), String> {
let payload_ble = command.trim().as_bytes().to_vec();
let frame_stream = format!("{}\n", command.trim()).into_bytes();
{
let guard = self.shared.ble_write_tx.lock().await;
if let Some(sender) = guard.as_ref() {
return sender
.send(payload_ble)
.map_err(|e| format!("BLE write channel closed: {e}"));
}
}
use tokio::io::AsyncWriteExt;
let mut guard = self.shared.writer.lock().await;
if let Some(writer) = guard.as_mut() {
writer
.write_all(&frame_stream)
.await
.map_err(|e| format!("write error: {e}"))
} else {
Err("not connected".to_string())
}
}
pub fn on_receive(&self, data: &str) {
let events = parse_line_to_events(data);
for event in &events {
self.apply_event_to_state(event);
dispatch_event(&self.on_event, &self.config.name, event);
}
}
pub(crate) fn process_incoming_chunk(&self, chunk: &str, buffer: &mut String) {
buffer.push_str(chunk);
buffer.retain(|c| c != '\0');
loop {
let next_lf = buffer.find('\n');
let next_cr = buffer.find('\r');
let pos = match (next_lf, next_cr) {
(Some(a), Some(b)) => a.min(b),
(Some(a), None) => a,
(None, Some(b)) => b,
(None, None) => break,
};
if pos == 0 {
buffer.drain(..=pos);
continue;
}
let line: String = buffer.drain(..=pos).collect();
let trimmed = line.trim_matches(|c| c == '\r' || c == '\n').trim();
if !trimmed.is_empty() {
self.on_receive(trimmed);
}
}
loop {
let Some(first_hash) = buffer.find('#') else {
buffer.clear();
break;
};
if first_hash > 0 {
buffer.drain(..first_hash);
}
let rest = &buffer[1..];
let Some(next_rel) = rest.find('#') else {
break;
};
let split_at = 1 + next_rel;
let frame = buffer[..split_at].trim();
if !frame.is_empty() {
self.on_receive(frame);
}
buffer.drain(..split_at);
}
let trimmed = buffer.trim();
let lower = trimmed.to_lowercase();
let looks_complete_tag = if let Some(payload) = lower.strip_prefix("#t+@") {
let payload = payload.trim();
let simple_hex = !payload.is_empty()
&& payload.len() >= 8
&& payload.chars().all(|c| c.is_ascii_hexdigit());
payload.contains('|') || simple_hex
} else {
false
};
let looks_complete_meta = lower.starts_with("#read:")
|| lower.starts_with("#start_reading:")
|| lower.starts_with("#name:")
|| lower == "#setup_done"
|| lower == "#tags_cleared"
|| lower == "#pong";
if !trimmed.is_empty() && (looks_complete_tag || looks_complete_meta) {
self.on_receive(trimmed);
buffer.clear();
}
}
pub fn parse_line(&self, input: &str) -> Vec<X714Event> {
let events = parse_line_to_events(input);
for event in &events {
self.apply_event_to_state(event);
dispatch_event(&self.on_event, &self.config.name, event);
}
events
}
pub(crate) fn apply_event_to_state(&self, event: &X714Event) {
match event {
X714Event::Reading(v) => self.shared.is_reading.store(*v, Ordering::Relaxed),
X714Event::SerialNumber(s) => {
*self.shared.serial_number.lock().unwrap() = Some(s.clone());
}
_ => {}
}
}
pub(crate) async fn on_connected(&self) {
self.shared.is_connected.store(true, Ordering::Relaxed);
dispatch_event(
&self.on_event,
&self.config.name,
&X714Event::Connection(true),
);
self.config_reader().await.ok();
if self.config.start_reading && !self.config.gpi_start {
self.start_inventory().await.ok();
}
}
pub(crate) fn on_disconnected(&self) {
self.shared.is_connected.store(false, Ordering::Relaxed);
self.shared.is_reading.store(false, Ordering::Relaxed);
*self.shared.serial_number.lock().unwrap() = None;
dispatch_event(
&self.on_event,
&self.config.name,
&X714Event::Connection(false),
);
}
pub fn on_start(&self) {
self.shared.is_reading.store(true, Ordering::Relaxed);
dispatch_event(&self.on_event, &self.config.name, &X714Event::Reading(true));
}
pub fn on_stop(&self) {
self.shared.is_reading.store(false, Ordering::Relaxed);
dispatch_event(
&self.on_event,
&self.config.name,
&X714Event::Reading(false),
);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use serde_json::{Number, Value};
use super::*;
#[test]
fn from_map_builds_reader_and_antennas() {
let params = HashMap::from([
(
"connection_type".to_string(),
Value::String("tcp".to_string()),
),
("ip".to_string(), Value::String("10.0.0.40".to_string())),
("tcp_port".to_string(), Value::Number(Number::from(23))),
(
"active_ant".to_string(),
Value::Array(vec![
Value::Number(Number::from(1)),
Value::Number(Number::from(3)),
]),
),
("read_power".to_string(), Value::Number(Number::from(25))),
]);
let reader = X714::from_map(params).expect("valid map");
assert_eq!(reader.config.connection_type, ConnectionType::Tcp);
assert_eq!(reader.config.tcp.ip, "10.0.0.40");
assert!(reader.config.ant_dict.get("1").expect("ant1").active);
assert!(!reader.config.ant_dict.get("2").expect("ant2").active);
assert!(reader.config.ant_dict.get("3").expect("ant3").active);
assert_eq!(reader.config.ant_dict.get("4").expect("ant4").power, 25);
}
#[test]
fn parse_tag_line() {
let reader = X714::default();
let events = reader.parse_line("#T+@E28011|E20033|2|58|LOCKED");
assert_eq!(events.len(), 1);
match &events[0] {
X714Event::Tag(tag) => {
assert_eq!(tag.epc.as_deref(), Some("e28011"));
assert_eq!(tag.tid.as_deref(), Some("e20033"));
assert_eq!(tag.ant, 2);
assert_eq!(tag.rssi, -58);
assert_eq!(tag.protected.as_deref(), Some("locked"));
}
_ => panic!("unexpected event"),
}
}
#[test]
fn reading_state_updated_by_parse_line() {
let reader = X714::default();
assert!(!reader.is_reading());
reader.parse_line("#read:on");
assert!(reader.is_reading());
reader.parse_line("#read:off");
assert!(!reader.is_reading());
}
#[test]
fn serial_number_updated_by_parse_line() {
let reader = X714::default();
reader.parse_line("#name:X714-ABC123");
assert_eq!(reader.serial_number().as_deref(), Some("x714-abc123"));
}
#[test]
fn clone_shares_runtime_state() {
let reader = X714::default();
let clone = reader.clone();
reader.parse_line("#read:on");
assert!(clone.is_reading()); }
#[test]
fn config_commands_include_core_flags() {
let mut reader = X714::default();
reader.config.session = 2;
reader.config.start_reading = true;
let cmds = reader.config_commands();
assert!(cmds.iter().any(|c| c == "#session:2"));
assert!(cmds.iter().any(|c| c == "#start_reading:on"));
assert!(cmds.iter().any(|c| c == "#setup_reader"));
}
#[test]
fn process_incoming_chunk_parses_cr_delimited_frames() {
let reader = X714::default();
let mut buffer = String::new();
reader.process_incoming_chunk("#read:on\r#name:X714-ABC\r", &mut buffer);
assert!(reader.is_reading());
assert_eq!(reader.serial_number().as_deref(), Some("x714-abc"));
assert!(buffer.is_empty());
}
#[test]
fn process_incoming_chunk_parses_concatenated_hash_frames() {
let reader = X714::default();
let mut buffer = String::new();
reader.process_incoming_chunk("#read:on#read:off", &mut buffer);
assert!(!reader.is_reading());
assert!(buffer.is_empty());
}
}