Skip to main content

ternlang_runtime/
lib.rs

1// SPDX-License-Identifier: LicenseRef-Ternlang-Commercial
2// Ternlang — RFI-IRFOS Ternary Intelligence Stack
3// Copyright (C) 2026 RFI-IRFOS. All rights reserved.
4// Commercial tier. See LICENSE-COMMERCIAL in the repository root.
5// Unauthorized use, copying, or distribution is prohibited.
6
7//! ternlang-runtime — Distributed actor runtime for ternlang
8//!
9//! Phase 5.1: synchronous TCP transport for remote agent communication.
10//!
11//! Protocol: newline-delimited JSON over TCP.
12//! Each message is a single JSON object followed by '\n'.
13//!
14//! Message types:
15//!   {"type":"send",  "agent_id": 0, "trit": 1}     → send trit to local agent
16//!   {"type":"await", "agent_id": 0}                 → run agent handler, return result
17//!   {"type":"reply", "trit": 1}                     → response to await
18//!   {"type":"error", "msg": "..."}                  → error response
19//!
20//! Usage:
21//!   let node = TernNode::new("127.0.0.1:7373");
22//!   node.listen();                  // spawns listener thread
23//!   node.connect("127.0.0.1:7374"); // connect to peer
24//!   node.remote_send("127.0.0.1:7374", 0, 1);  // send +1 to remote agent 0
25//!   let result = node.remote_await("127.0.0.1:7374", 0); // get reply
26
27use std::collections::HashMap;
28use std::io::{BufRead, BufReader, Write};
29use std::net::{TcpListener, TcpStream};
30use std::sync::{Arc, Mutex};
31use std::thread;
32
33use serde::{Deserialize, Serialize};
34use ternlang_core::vm::RemoteTransport;
35
36/// A trit value serialized over the wire: -1, 0, or +1.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub struct WireTrit(pub i8);
39
40impl WireTrit {
41    pub fn new(v: i8) -> Self {
42        assert!(v == -1 || v == 0 || v == 1, "invalid trit: {}", v);
43        WireTrit(v)
44    }
45}
46
47/// Wire protocol message.
48#[derive(Debug, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "lowercase")]
50pub enum TernMessage {
51    /// Send a trit message to a local agent's mailbox.
52    Send  { agent_id: usize, trit: i8 },
53    /// Execute the agent's handler with its pending message, return the result.
54    Await { agent_id: usize },
55    /// Successful reply to an Await.
56    Reply { trit: i8 },
57    /// Error response.
58    Error { msg: String },
59}
60
61/// A remote agent reference: identifies an agent on a specific node.
62#[derive(Debug, Clone)]
63pub struct RemoteAgentRef {
64    pub node_addr: String,
65    pub agent_id: usize,
66}
67
68/// Local agent record: mailbox of pending trit messages.
69#[derive(Debug, Default)]
70struct LocalAgent {
71    mailbox: std::collections::VecDeque<i8>,
72}
73
74/// The ternlang distributed node.
75/// Manages local agent mailboxes and TCP connections to peer nodes.
76pub struct TernNode {
77    pub addr: String,
78    /// Local agents indexed by agent_id.
79    agents: Arc<Mutex<HashMap<usize, LocalAgent>>>,
80    /// Open connections to peer nodes: addr → stream.
81    peers: Arc<Mutex<HashMap<String, TcpStream>>>,
82}
83
84impl TernNode {
85    pub fn new(addr: &str) -> Self {
86        TernNode {
87            addr: addr.to_string(),
88            agents: Arc::new(Mutex::new(HashMap::new())),
89            peers: Arc::new(Mutex::new(HashMap::new())),
90        }
91    }
92
93    /// Register a local agent so it can receive remote messages.
94    pub fn register_agent(&self, agent_id: usize) {
95        self.agents.lock().unwrap()
96            .entry(agent_id)
97            .or_default();
98    }
99
100    /// Start the TCP listener in a background thread.
101    /// Incoming messages are dispatched to local agent mailboxes.
102    pub fn listen(&self) {
103        let addr = self.addr.clone();
104        let agents = Arc::clone(&self.agents);
105
106        thread::spawn(move || {
107            let listener = TcpListener::bind(&addr)
108                .unwrap_or_else(|e| panic!("TernNode: cannot bind {}: {}", addr, e));
109            for stream in listener.incoming().flatten() {
110                let agents = Arc::clone(&agents);
111                thread::spawn(move || {
112                    handle_connection(stream, agents);
113                });
114            }
115        });
116    }
117
118    /// Returns the current logical health of the runtime.
119    pub fn health(&self) -> i8 {
120        if self.verify_handshake() { 1 } else { 0 }
121    }
122
123    /// [FAILSAFE] Cryptographic Killswitch (Operation Cognitive Assimilation)
124    /// Requires a live ontological handshake from RFI-IRFOS API to validate routing.
125    /// If signature is missing or API is unreachable, the system enters persistent THOLD.
126    pub fn verify_handshake(&self) -> bool {
127        #[cfg(test)]
128        { return true; }
129
130        if std::env::var("RFI_UNIT_TEST").is_ok() {
131            return true;
132        }
133
134        // Local signature bypass for Tier 3 Enterprise
135        if std::path::Path::new("/etc/rfi-irfos/ontological.sig").exists() {
136            return true;
137        }
138
139        // Live API Handshake
140        let client = reqwest::blocking::Client::builder()
141            .timeout(std::time::Duration::from_secs(3))
142            .build();
143        
144        if let Ok(c) = client {
145            let res = c.post("https://ternlang-api.fly.dev/api/v1/heartbeat")
146                .json(&serde_json::json!({
147                    "node_id": self.addr,
148                    "stage": "HANDSHAKE"
149                }))
150                .send();
151            
152            if let Ok(response) = res {
153                if let Ok(body) = response.json::<serde_json::Value>() {
154                    return body["status"].as_i64() == Some(1);
155                }
156            }
157        }
158
159        false
160    }
161
162    /// Connect to a peer node, storing the stream for future sends.
163    pub fn connect(&self, peer_addr: &str) -> std::io::Result<()> {
164        let stream = TcpStream::connect(peer_addr)?;
165        self.peers.lock().unwrap()
166            .insert(peer_addr.to_string(), stream);
167        Ok(())
168    }
169
170    /// Send a trit to a remote agent's mailbox.
171    pub fn remote_send(&self, peer_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
172        let msg = TernMessage::Send { agent_id, trit };
173        self.send_msg(peer_addr, &msg)
174    }
175
176    /// Trigger a remote agent to process its mailbox and return the result trit.
177    pub fn remote_await(&self, peer_addr: &str, agent_id: usize) -> std::io::Result<i8> {
178        let msg = TernMessage::Await { agent_id };
179        self.send_msg(peer_addr, &msg)?;
180        // Read the reply from the same connection.
181        let mut peers = self.peers.lock().unwrap();
182        let stream = peers.get_mut(peer_addr)
183            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
184        let mut reader = BufReader::new(stream.try_clone()?);
185        let mut line = String::new();
186        reader.read_line(&mut line)?;
187        let reply: TernMessage = serde_json::from_str(line.trim())
188            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
189        match reply {
190            TernMessage::Reply { trit } => Ok(trit),
191            TernMessage::Error { msg } =>
192                Err(std::io::Error::new(std::io::ErrorKind::Other, msg)),
193            _ => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected message")),
194        }
195    }
196
197    /// Push a trit directly into a local agent's mailbox (no network).
198    pub fn local_send(&self, agent_id: usize, trit: i8) {
199        let mut agents = self.agents.lock().unwrap();
200        agents.entry(agent_id).or_default().mailbox.push_back(trit);
201    }
202
203    /// Pop a trit from a local agent's mailbox (returns 0 if empty).
204    pub fn local_pop(&self, agent_id: usize) -> i8 {
205        let mut agents = self.agents.lock().unwrap();
206        agents.entry(agent_id).or_default().mailbox.pop_front().unwrap_or(0)
207    }
208
209    fn send_msg(&self, peer_addr: &str, msg: &TernMessage) -> std::io::Result<()> {
210        let mut peers = self.peers.lock().unwrap();
211        let stream = peers.get_mut(peer_addr)
212            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotConnected, "not connected"))?;
213        let mut line = serde_json::to_string(msg)
214            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
215        line.push('\n');
216        stream.write_all(line.as_bytes())
217    }
218}
219
220// ─── RemoteTransport impl ────────────────────────────────────────────────────
221
222/// Implement the VM's `RemoteTransport` trait so `TernNode` can be injected
223/// directly into `BetVm::set_remote(Arc::new(node))`.
224impl RemoteTransport for TernNode {
225    fn remote_send(&self, node_addr: &str, agent_id: usize, trit: i8) -> std::io::Result<()> {
226        if !self.verify_handshake() {
227            return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, 
228                "RFI-IRFOS: Node is in permanent THOLD state. Ontological handshake failed."));
229        }
230        // Auto-connect if not already connected to this peer.
231        if !self.peers.lock().unwrap().contains_key(node_addr) {
232            self.connect(node_addr)?;
233        }
234        TernNode::remote_send(self, node_addr, agent_id, trit)
235    }
236
237    fn remote_await(&self, node_addr: &str, agent_id: usize) -> std::io::Result<i8> {
238        if !self.verify_handshake() {
239            return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, 
240                "RFI-IRFOS: Node is in permanent THOLD state. Ontological handshake failed."));
241        }
242        if !self.peers.lock().unwrap().contains_key(node_addr) {
243            self.connect(node_addr)?;
244        }
245        TernNode::remote_await(self, node_addr, agent_id)
246    }
247}
248
249/// Handle one incoming connection — reads messages, writes replies.
250/// The caller supplies a handler function for Await messages.
251/// For Phase 5.1 the Await handler is the identity (echoes mailbox message back).
252fn handle_connection(stream: TcpStream, agents: Arc<Mutex<HashMap<usize, LocalAgent>>>) {
253    let mut writer = stream.try_clone().expect("clone failed");
254    let reader = BufReader::new(stream);
255    for line in reader.lines().flatten() {
256        let msg: TernMessage = match serde_json::from_str(&line) {
257            Ok(m) => m,
258            Err(e) => {
259                let err = TernMessage::Error { msg: e.to_string() };
260                let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
261                continue;
262            }
263        };
264        match msg {
265            TernMessage::Send { agent_id, trit } => {
266                agents.lock().unwrap()
267                    .entry(agent_id)
268                    .or_default()
269                    .mailbox.push_back(trit);
270                // No reply expected for Send.
271            }
272            TernMessage::Await { agent_id } => {
273                let trit = agents.lock().unwrap()
274                    .entry(agent_id)
275                    .or_default()
276                    .mailbox.pop_front()
277                    .unwrap_or(0); // empty mailbox → hold (0)
278                let reply = TernMessage::Reply { trit };
279                let _ = writeln!(writer, "{}", serde_json::to_string(&reply).unwrap());
280            }
281            _ => {
282                let err = TernMessage::Error { msg: "unexpected message type".into() };
283                let _ = writeln!(writer, "{}", serde_json::to_string(&err).unwrap());
284            }
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use std::time::Duration;
293
294    #[test]
295    fn test_local_send_pop() {
296        let node = TernNode::new("127.0.0.1:0"); // port 0 = don't listen
297        node.register_agent(0);
298        node.local_send(0, 1);
299        node.local_send(0, -1);
300        assert_eq!(node.local_pop(0),  1);
301        assert_eq!(node.local_pop(0), -1);
302        assert_eq!(node.local_pop(0),  0); // empty → hold
303    }
304
305    #[test]
306    fn test_wire_protocol_send_await() {
307        // Start a listener node on a free port
308        let server = TernNode::new("127.0.0.1:7373");
309        server.register_agent(42);
310        server.listen();
311        thread::sleep(Duration::from_millis(50)); // let listener start
312
313        // Client connects and sends a trit to agent 42, then awaits
314        let client = TernNode::new("127.0.0.1:0");
315        client.connect("127.0.0.1:7373").expect("connect failed");
316        client.remote_send("127.0.0.1:7373", 42, 1).expect("send failed");
317
318        // Now await — server pops mailbox (holds +1) and replies
319        let result = client.remote_await("127.0.0.1:7373", 42).expect("await failed");
320        assert_eq!(result, 1);
321    }
322
323    /// Phase 5.1 end-to-end: VM with remote transport sends/awaits across two TernNodes.
324    /// Node A runs the BET VM. Node B is the "remote agent". VM injects TernNode as transport,
325    /// then TSEND routes trit to Node B over TCP and TAWAIT retrieves the reply.
326    #[test]
327    fn test_vm_remote_transport_integration() {
328        use ternlang_core::vm::{BetVm, Value, RemoteTransport};
329        use ternlang_core::trit::Trit;
330        use std::sync::Arc;
331
332        // Server node: listens, registers agent 0
333        let server = Arc::new(TernNode::new("127.0.0.1:7374"));
334        server.register_agent(0);
335        server.listen();
336        thread::sleep(Duration::from_millis(50));
337
338        // Client node: will be injected into VM as RemoteTransport
339        let client = Arc::new(TernNode::new("127.0.0.1:0"));
340
341        // Build bytecode manually:
342        //   TPUSHSTR "127.0.0.1:7374"   — push node addr
343        //   TSPAWNREMOTE type_id=0       — push AgentRef(0, Some("127.0.0.1:7374"))
344        //   TSTORE reg0                  — save agent ref
345        //   TPUSH +1                     — push message
346        //   TLOAD reg0                   — load agent ref  ← note: TSEND pops (agent, msg)
347        //   ... actually the stack order: TSEND expects (AgentRef, message) in order
348        //     push AgentRef, push message, TSEND
349        //   TLOAD reg0
350        //   TPUSH +1
351        //   TSEND                        — remote_send(addr=7374, id=0, trit=+1)
352        //   TLOAD reg0
353        //   TAWAIT                       — remote_await → push result
354        //   TSTORE reg1
355        //   THALT
356
357        // Use TernNode directly instead of via VM bytecode to keep this focused on the trait:
358        // The trait impl is what we're testing — routing through RemoteTransport.
359        client.connect("127.0.0.1:7374").expect("connect");
360        // Test via trait interface directly
361        let rt: &dyn RemoteTransport = client.as_ref();
362        rt.remote_send("127.0.0.1:7374", 0, -1).expect("remote_send via trait");
363        let result = rt.remote_await("127.0.0.1:7374", 0).expect("remote_await via trait");
364        assert_eq!(result, -1, "expected trit -1 echoed back from remote agent");
365    }
366
367    /// Auto-connect: calling remote_send without prior connect() should still work.
368    #[test]
369    fn test_auto_connect_on_remote_send() {
370        use ternlang_core::vm::RemoteTransport;
371        use std::sync::Arc;
372
373        let server = Arc::new(TernNode::new("127.0.0.1:7375"));
374        server.register_agent(1);
375        server.listen();
376        thread::sleep(Duration::from_millis(50));
377
378        // Client: no explicit connect() call
379        let client = Arc::new(TernNode::new("127.0.0.1:0"));
380        let rt: &dyn RemoteTransport = client.as_ref();
381        // Should auto-connect on first use
382        rt.remote_send("127.0.0.1:7375", 1, 1).expect("auto-connect send");
383        let r = rt.remote_await("127.0.0.1:7375", 1).expect("auto-connect await");
384        assert_eq!(r, 1);
385    }
386}