use nscldaq_ringbuffer::ringbuffer::{consumer, producer, RingBufferMap};
use portman_client;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::path;
use std::process;
use std::sync::{Arc, Mutex};
pub enum Error {
ConsumerError(consumer::Error),
ProducerError(producer::Error),
MapError(String),
PortManError(portman_client::Error),
NoRingMaster,
RingMasterFail(String),
Unimplemented,
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let stringified = match self {
Error::ConsumerError(e) => {
format!("Consumer error {}", consumer::error_string(e))
}
Error::ProducerError(e) => {
format!("Producer error {}", producer::error_string(e))
}
Error::MapError(s) => format!("Ring Buffer Mapping error {}", s),
Error::PortManError(e) => {
format!("Error interacting with port manager: {}", e.to_string())
}
Error::NoRingMaster => format!("The ring master is not running"),
Error::RingMasterFail(s) => format!("Interaction with ringmaster failed: {}", s),
Error::Unimplemented => String::from("Unimplemented operation attempted"),
};
write!(f, "{}", stringified)
}
}
pub enum ClientType {
Consumer(consumer::Consumer),
Producer(producer::Producer),
}
#[allow(dead_code)]
#[allow(unused_variables)]
pub struct RingClient {
pub client: ClientType,
ring_master: TcpStream,
}
#[allow(non_upper_case_globals)]
static mut portman_port: u16 = 30000;
pub type RingClientResult = Result<RingClient, Error>;
pub fn set_portman_port(new_port: u16) {
unsafe { portman_port = new_port };
}
pub fn attach_consumer(ring_buffer_file: &str) -> RingClientResult {
match get_ringmaster_port() {
Ok(port) => match RingBufferMap::new(ring_buffer_file) {
Ok(raw_map) => {
let safe_map = Arc::new(Mutex::new(raw_map));
match consumer::Consumer::attach(&Arc::clone(&safe_map)) {
Ok(consumer) => {
let slot = consumer.get_index();
match connect_consumer(port, &ring_name(&ring_buffer_file), slot) {
Err(e) => Err(e),
Ok(stream) => Ok(RingClient {
client: ClientType::Consumer(consumer),
ring_master: stream,
}),
}
}
Err(e) => Err(Error::ConsumerError(e)),
}
}
Err(s) => Err(Error::MapError(s)),
},
Err(e) => Err(e),
}
}
pub fn attach_producer(ring_buffer_file: &str) -> RingClientResult {
match get_ringmaster_port() {
Ok(port) => match RingBufferMap::new(ring_buffer_file) {
Ok(raw_map) => {
let safe_map = Arc::new(Mutex::new(raw_map));
match producer::Producer::attach(&Arc::clone(&safe_map)) {
Ok(producer) => match connect_producer(port, &ring_name(&ring_buffer_file)) {
Err(e) => Err(e),
Ok(stream) => Ok(RingClient {
client: ClientType::Producer(producer),
ring_master: stream,
}),
},
Err(e) => Err(Error::ProducerError(e)),
}
}
Err(s) => Err(Error::MapError(s)),
},
Err(e) => Err(e),
}
}
fn get_ringmaster_port() -> Result<u16, Error> {
let port = unsafe { portman_port };
let mut client = portman_client::Client::new(port);
match client.find_by_service("RingMaster") {
Err(e) => Err(Error::PortManError(e)),
Ok(v) => {
if v.len() == 0 {
Err(Error::NoRingMaster)
} else {
Ok(v[0].port) }
}
}
}
fn ring_name(filename: &str) -> String {
String::from(
path::Path::new(filename)
.file_name()
.unwrap()
.to_str()
.unwrap(),
)
}
fn connect_consumer(port: u16, ring: &str, slot: u32) -> Result<TcpStream, Error> {
let request = format!(
"CONNECT {{{}}} consumer.{} {} RUST Client\n",
ring,
slot,
process::id()
);
ringmaster_request(port, &request)
}
fn connect_producer(port: u16, ring: &str) -> Result<TcpStream, Error> {
let request = format!(
"CONNECT {{{}}} producer {} RUST Client\n",
ring,
process::id()
);
ringmaster_request(port, &request)
}
fn ringmaster_request(port: u16, request: &str) -> Result<TcpStream, Error> {
match TcpStream::connect(format!("127.0.0.1:{}", port).as_str()) {
Err(_) => Err(Error::NoRingMaster),
Ok(mut stream) => {
if let Err(_) = stream.write_all(request.as_bytes()) {
Err(Error::NoRingMaster)
} else {
if let Err(_) = stream.flush() {
Err(Error::NoRingMaster)
} else {
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut line = String::new();
if let Ok(_n) = reader.read_line(&mut line) {
if line.trim() == "OK" {
Ok(stream)
} else {
Err(Error::RingMasterFail(line))
}
} else {
Err(Error::NoRingMaster)
}
}
}
}
}
}