1use std::collections::HashMap;
22use std::io::{BufRead, BufReader, Write};
23use std::net::{TcpListener, TcpStream};
24use std::sync::{Arc, Mutex};
25use std::thread;
26
27use serde::{Deserialize, Serialize};
28use ternlang_core::vm::RemoteTransport;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32pub struct WireTrit(pub i8);
33
34impl WireTrit {
35 pub fn new(v: i8) -> Self {
36 assert!(v == -1 || v == 0 || v == 1, "invalid trit: {}", v);
37 WireTrit(v)
38 }
39}
40
41#[derive(Debug, Serialize, Deserialize)]
43#[serde(tag = "type", rename_all = "lowercase")]
44pub enum TernMessage {
45 Send { agent_id: usize, trit: i8 },
47 Await { agent_id: usize },
49 Reply { trit: i8 },
51 Error { msg: String },
53}
54
55#[derive(Debug, Clone)]
57pub struct RemoteAgentRef {
58 pub node_addr: String,
59 pub agent_id: usize,
60}
61
62#[derive(Debug, Default)]
64struct LocalAgent {
65 mailbox: std::collections::VecDeque<i8>,
66}
67
68pub struct TernNode {
71 pub addr: String,
72 agents: Arc<Mutex<HashMap<usize, LocalAgent>>>,
74 peers: Arc<Mutex<HashMap<String, TcpStream>>>,
76}
77
78impl TernNode {
79 pub fn new(addr: &str) -> Self {
80 TernNode {
81 addr: addr.to_string(),
82 agents: Arc::new(Mutex::new(HashMap::new())),
83 peers: Arc::new(Mutex::new(HashMap::new())),
84 }
85 }
86
87 pub fn register_agent(&self, agent_id: usize) {
89 self.agents.lock().unwrap()
90 .entry(agent_id)
91 .or_default();
92 }
93
94 pub fn listen(&self) {
97 let addr = self.addr.clone();
98 let agents = Arc::clone(&self.agents);
99
100 thread::spawn(move || {
101 let listener = TcpListener::bind(&addr)
102 .unwrap_or_else(|e| panic!("TernNode: cannot bind {}: {}", addr, e));
103 for stream in listener.incoming().flatten() {
104 let agents = Arc::clone(&agents);
105 thread::spawn(move || {
106 handle_connection(stream, agents);
107 });
108 }
109 });
110 }
111
112 pub fn connect(&self, peer_addr: &str) -> std::io::Result<()> {
114 let stream = TcpStream::connect(peer_addr)?;
115 self.peers.lock().unwrap()
116 .insert(peer_addr.to_string(), stream);
117 Ok(())
118 }
119
120 pub fn remote_send(&self, peer_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
122 let msg = TernMessage::Send { agent_id, trit };
123 self.send_msg(peer_addr, &msg)
124 }
125
126 pub fn remote_await(&self, peer_addr: &str, agent_id: usize) -> std::io::Result<i8> {
128 let msg = TernMessage::Await { agent_id };
129 self.send_msg(peer_addr, &msg)?;
130 let mut peers = self.peers.lock().unwrap();
132 let stream = peers.get_mut(peer_addr)
133 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
134 let mut reader = BufReader::new(stream.try_clone()?);
135 let mut line = String::new();
136 reader.read_line(&mut line)?;
137 let reply: TernMessage = serde_json::from_str(line.trim())
138 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
139 match reply {
140 TernMessage::Reply { trit } => Ok(trit),
141 TernMessage::Error { msg } =>
142 Err(std::io::Error::new(std::io::ErrorKind::Other, msg)),
143 _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected message")),
144 }
145 }
146
147 pub fn local_send(&self, agent_id: usize, trit: i8) {
149 let mut agents = self.agents.lock().unwrap();
150 agents.entry(agent_id).or_default().mailbox.push_back(trit);
151 }
152
153 pub fn local_pop(&self, agent_id: usize) -> i8 {
155 let mut agents = self.agents.lock().unwrap();
156 agents.entry(agent_id).or_default().mailbox.pop_front().unwrap_or(0)
157 }
158
159 fn send_msg(&self, peer_addr: &str, msg: &TernMessage) -> std::io::Result<()> {
160 let mut peers = self.peers.lock().unwrap();
161 let stream = peers.get_mut(peer_addr)
162 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
163 let mut line = serde_json::to_string(msg)
164 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
165 line.push('\n');
166 stream.write_all(line.as_bytes())
167 }
168}
169
170impl RemoteTransport for TernNode {
175 fn remote_send(&self, node_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
176 if !self.peers.lock().unwrap().contains_key(node_addr) {
178 self.connect(node_addr)?;
179 }
180 TernNode::remote_send(self, node_addr, agent_id, trit)
181 }
182
183 fn remote_await(&self, node_addr: &str, agent_id: usize) -> std::io::Result<i8> {
184 if !self.peers.lock().unwrap().contains_key(node_addr) {
185 self.connect(node_addr)?;
186 }
187 TernNode::remote_await(self, node_addr, agent_id)
188 }
189}
190
191fn handle_connection(stream: TcpStream, agents: Arc<Mutex<HashMap<usize, LocalAgent>>>) {
195 let mut writer = stream.try_clone().expect("clone failed");
196 let reader = BufReader::new(stream);
197 for line in reader.lines().flatten() {
198 let msg: TernMessage = match serde_json::from_str(&line) {
199 Ok(m) => m,
200 Err(e) => {
201 let err = TernMessage::Error { msg: e.to_string() };
202 let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
203 continue;
204 }
205 };
206 match msg {
207 TernMessage::Send { agent_id, trit } => {
208 agents.lock().unwrap()
209 .entry(agent_id)
210 .or_default()
211 .mailbox.push_back(trit);
212 }
214 TernMessage::Await { agent_id } => {
215 let trit = agents.lock().unwrap()
216 .entry(agent_id)
217 .or_default()
218 .mailbox.pop_front()
219 .unwrap_or(0); let reply = TernMessage::Reply { trit };
221 let _ = writeln!(writer, "{}", serde_json::to_string(&reply).unwrap());
222 }
223 _ => {
224 let err = TernMessage::Error { msg: "unexpected message type".into() };
225 let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
226 }
227 }
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use std::time::Duration;
235
236 #[test]
237 fn test_local_send_pop() {
238 let node = TernNode::new("127.0.0.1:0"); node.register_agent(0);
240 node.local_send(0, 1);
241 node.local_send(0, -1);
242 assert_eq!(node.local_pop(0), 1);
243 assert_eq!(node.local_pop(0), -1);
244 assert_eq!(node.local_pop(0), 0); }
246
247 #[test]
248 fn test_wire_protocol_send_await() {
249 let server = TernNode::new("127.0.0.1:7373");
251 server.register_agent(42);
252 server.listen();
253 thread::sleep(Duration::from_millis(50)); let client = TernNode::new("127.0.0.1:0");
257 client.connect("127.0.0.1:7373").expect("connect failed");
258 client.remote_send("127.0.0.1:7373", 42, 1).expect("send failed");
259
260 let result = client.remote_await("127.0.0.1:7373", 42).expect("await failed");
262 assert_eq!(result, 1);
263 }
264
265 #[test]
269 fn test_vm_remote_transport_integration() {
270 use ternlang_core::vm::{BetVm, Value, RemoteTransport};
271 use ternlang_core::trit::Trit;
272 use std::sync::Arc;
273
274 let server = Arc::new(TernNode::new("127.0.0.1:7374"));
276 server.register_agent(0);
277 server.listen();
278 thread::sleep(Duration::from_millis(50));
279
280 let client = Arc::new(TernNode::new("127.0.0.1:0"));
282
283 client.connect("127.0.0.1:7374").expect("connect");
302 let rt: &dyn RemoteTransport = client.as_ref();
304 rt.remote_send("127.0.0.1:7374", 0, -1).expect("remote_send via trait");
305 let result = rt.remote_await("127.0.0.1:7374", 0).expect("remote_await via trait");
306 assert_eq!(result, -1, "expected trit -1 echoed back from remote agent");
307 }
308
309 #[test]
311 fn test_auto_connect_on_remote_send() {
312 use ternlang_core::vm::RemoteTransport;
313 use std::sync::Arc;
314
315 let server = Arc::new(TernNode::new("127.0.0.1:7375"));
316 server.register_agent(1);
317 server.listen();
318 thread::sleep(Duration::from_millis(50));
319
320 let client = Arc::new(TernNode::new("127.0.0.1:0"));
322 let rt: &dyn RemoteTransport = client.as_ref();
323 rt.remote_send("127.0.0.1:7375", 1, 1).expect("auto-connect send");
325 let r = rt.remote_await("127.0.0.1:7375", 1).expect("auto-connect await");
326 assert_eq!(r, 1);
327 }
328}