use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;
use serde::{Deserialize, Serialize};
use ternlang_core::vm::RemoteTransport;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct WireTrit(pub i8);
impl WireTrit {
pub fn new(v: i8) -> Self {
assert!(v == -1 || v == 0 || v == 1, "invalid trit: {}", v);
WireTrit(v)
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum TernMessage {
Send { agent_id: usize, trit: i8 },
Await { agent_id: usize },
Reply { trit: i8 },
Error { msg: String },
}
#[derive(Debug, Clone)]
pub struct RemoteAgentRef {
pub node_addr: String,
pub agent_id: usize,
}
#[derive(Debug, Default)]
struct LocalAgent {
mailbox: std::collections::VecDeque<i8>,
}
pub struct TernNode {
pub addr: String,
agents: Arc<Mutex<HashMap<usize, LocalAgent>>>,
peers: Arc<Mutex<HashMap<String, TcpStream>>>,
}
impl TernNode {
pub fn new(addr: &str) -> Self {
TernNode {
addr: addr.to_string(),
agents: Arc::new(Mutex::new(HashMap::new())),
peers: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn register_agent(&self, agent_id: usize) {
self.agents.lock().unwrap()
.entry(agent_id)
.or_default();
}
pub fn listen(&self) {
let addr = self.addr.clone();
let agents = Arc::clone(&self.agents);
thread::spawn(move || {
let listener = TcpListener::bind(&addr)
.unwrap_or_else(|e| panic!("TernNode: cannot bind {}: {}", addr, e));
for stream in listener.incoming().flatten() {
let agents = Arc::clone(&agents);
thread::spawn(move || {
handle_connection(stream, agents);
});
}
});
}
pub fn health(&self) -> i8 {
if self.verify_handshake() { 1 } else { 0 }
}
pub fn verify_handshake(&self) -> bool {
#[cfg(test)]
{ return true; }
if std::env::var("RFI_UNIT_TEST").is_ok() {
return true;
}
if std::path::Path::new("/etc/rfi-irfos/ontological.sig").exists() {
return true;
}
let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(3))
.build();
if let Ok(c) = client {
let res = c.post("https://ternlang-api.fly.dev/api/v1/heartbeat")
.json(&serde_json::json!({
"node_id": self.addr,
"stage": "HANDSHAKE"
}))
.send();
if let Ok(response) = res {
if let Ok(body) = response.json::<serde_json::Value>() {
return body["status"].as_i64() == Some(1);
}
}
}
false
}
pub fn connect(&self, peer_addr: &str) -> std::io::Result<()> {
let stream = TcpStream::connect(peer_addr)?;
self.peers.lock().unwrap()
.insert(peer_addr.to_string(), stream);
Ok(())
}
pub fn remote_send(&self, peer_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
let msg = TernMessage::Send { agent_id, trit };
self.send_msg(peer_addr, &msg)
}
pub fn remote_await(&self, peer_addr: &str, agent_id: usize) -> std::io::Result<i8> {
let msg = TernMessage::Await { agent_id };
self.send_msg(peer_addr, &msg)?;
let mut peers = self.peers.lock().unwrap();
let stream = peers.get_mut(peer_addr)
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
let mut reader = BufReader::new(stream.try_clone()?);
let mut line = String::new();
reader.read_line(&mut line)?;
let reply: TernMessage = serde_json::from_str(line.trim())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
match reply {
TernMessage::Reply { trit } => Ok(trit),
TernMessage::Error { msg } =>
Err(std::io::Error::new(std::io::ErrorKind::Other, msg)),
_ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected message")),
}
}
pub fn local_send(&self, agent_id: usize, trit: i8) {
let mut agents = self.agents.lock().unwrap();
agents.entry(agent_id).or_default().mailbox.push_back(trit);
}
pub fn local_pop(&self, agent_id: usize) -> i8 {
let mut agents = self.agents.lock().unwrap();
agents.entry(agent_id).or_default().mailbox.pop_front().unwrap_or(0)
}
fn send_msg(&self, peer_addr: &str, msg: &TernMessage) -> std::io::Result<()> {
let mut peers = self.peers.lock().unwrap();
let stream = peers.get_mut(peer_addr)
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
let mut line = serde_json::to_string(msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
line.push('\n');
stream.write_all(line.as_bytes())
}
}
impl RemoteTransport for TernNode {
fn remote_send(&self, node_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
if !self.verify_handshake() {
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied,
"RFI-IRFOS: Node is in permanent THOLD state. Ontological handshake failed."));
}
if !self.peers.lock().unwrap().contains_key(node_addr) {
self.connect(node_addr)?;
}
TernNode::remote_send(self, node_addr, agent_id, trit)
}
fn remote_await(&self, node_addr: &str, agent_id: usize) -> std::io::Result<i8> {
if !self.verify_handshake() {
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied,
"RFI-IRFOS: Node is in permanent THOLD state. Ontological handshake failed."));
}
if !self.peers.lock().unwrap().contains_key(node_addr) {
self.connect(node_addr)?;
}
TernNode::remote_await(self, node_addr, agent_id)
}
}
fn handle_connection(stream: TcpStream, agents: Arc<Mutex<HashMap<usize, LocalAgent>>>) {
let mut writer = stream.try_clone().expect("clone failed");
let reader = BufReader::new(stream);
for line in reader.lines().flatten() {
let msg: TernMessage = match serde_json::from_str(&line) {
Ok(m) => m,
Err(e) => {
let err = TernMessage::Error { msg: e.to_string() };
let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
continue;
}
};
match msg {
TernMessage::Send { agent_id, trit } => {
agents.lock().unwrap()
.entry(agent_id)
.or_default()
.mailbox.push_back(trit);
}
TernMessage::Await { agent_id } => {
let trit = agents.lock().unwrap()
.entry(agent_id)
.or_default()
.mailbox.pop_front()
.unwrap_or(0); let reply = TernMessage::Reply { trit };
let _ = writeln!(writer, "{}", serde_json::to_string(&reply).unwrap());
}
_ => {
let err = TernMessage::Error { msg: "unexpected message type".into() };
let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_local_send_pop() {
let node = TernNode::new("127.0.0.1:0"); node.register_agent(0);
node.local_send(0, 1);
node.local_send(0, -1);
assert_eq!(node.local_pop(0), 1);
assert_eq!(node.local_pop(0), -1);
assert_eq!(node.local_pop(0), 0); }
#[test]
fn test_wire_protocol_send_await() {
let server = TernNode::new("127.0.0.1:7373");
server.register_agent(42);
server.listen();
thread::sleep(Duration::from_millis(50));
let client = TernNode::new("127.0.0.1:0");
client.connect("127.0.0.1:7373").expect("connect failed");
client.remote_send("127.0.0.1:7373", 42, 1).expect("send failed");
let result = client.remote_await("127.0.0.1:7373", 42).expect("await failed");
assert_eq!(result, 1);
}
#[test]
fn test_vm_remote_transport_integration() {
use ternlang_core::vm::{BetVm, Value, RemoteTransport};
use ternlang_core::trit::Trit;
use std::sync::Arc;
let server = Arc::new(TernNode::new("127.0.0.1:7374"));
server.register_agent(0);
server.listen();
thread::sleep(Duration::from_millis(50));
let client = Arc::new(TernNode::new("127.0.0.1:0"));
client.connect("127.0.0.1:7374").expect("connect");
let rt: &dyn RemoteTransport = client.as_ref();
rt.remote_send("127.0.0.1:7374", 0, -1).expect("remote_send via trait");
let result = rt.remote_await("127.0.0.1:7374", 0).expect("remote_await via trait");
assert_eq!(result, -1, "expected trit -1 echoed back from remote agent");
}
#[test]
fn test_auto_connect_on_remote_send() {
use ternlang_core::vm::RemoteTransport;
use std::sync::Arc;
let server = Arc::new(TernNode::new("127.0.0.1:7375"));
server.register_agent(1);
server.listen();
thread::sleep(Duration::from_millis(50));
let client = Arc::new(TernNode::new("127.0.0.1:0"));
let rt: &dyn RemoteTransport = client.as_ref();
rt.remote_send("127.0.0.1:7375", 1, 1).expect("auto-connect send");
let r = rt.remote_await("127.0.0.1:7375", 1).expect("auto-connect await");
assert_eq!(r, 1);
}
}