use std::collections::BTreeMap;
use std::io::Write;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use harn_vm::{HostCallBridge, VmError, VmValue};
use serde_json::{json, Value};
use crate::protocol::DapResponse;
const REVERSE_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
pub type PendingMap = Arc<Mutex<BTreeMap<i64, Sender<DapHostCallReply>>>>;
#[derive(Debug)]
pub struct DapHostCallReply {
pub success: bool,
pub body: Option<Value>,
pub message: Option<String>,
}
#[derive(Clone)]
pub struct DapHostBridge {
seq: Arc<AtomicI64>,
stdout: Arc<Mutex<Box<dyn Write + Send>>>,
pending: PendingMap,
#[allow(dead_code)]
forward_ops: Option<Arc<Vec<String>>>,
}
impl DapHostBridge {
pub fn new(
seq: Arc<AtomicI64>,
stdout: Arc<Mutex<Box<dyn Write + Send>>>,
pending: PendingMap,
) -> Self {
Self {
seq,
stdout,
pending,
forward_ops: None,
}
}
fn next_seq(&self) -> i64 {
self.seq.fetch_add(1, Ordering::SeqCst)
}
fn send_reverse_request(
&self,
capability: &str,
operation: &str,
params_json: Value,
) -> Result<Receiver<DapHostCallReply>, VmError> {
let req_seq = self.next_seq();
let (tx, rx) = channel();
{
let mut guard = self
.pending
.lock()
.map_err(|_| VmError::Runtime("DapHostBridge pending map poisoned".into()))?;
guard.insert(req_seq, tx);
}
let frame = json!({
"seq": req_seq,
"type": "request",
"command": "harnHostCall",
"arguments": {
"capability": capability,
"operation": operation,
"params": params_json,
},
});
let body = serde_json::to_string(&frame)
.map_err(|e| VmError::Runtime(format!("DAP encode: {e}")))?;
let mut stdout = self
.stdout
.lock()
.map_err(|_| VmError::Runtime("DapHostBridge stdout mutex poisoned".into()))?;
let header = format!("Content-Length: {}\r\n\r\n", body.len());
stdout
.write_all(header.as_bytes())
.and_then(|_| stdout.write_all(body.as_bytes()))
.and_then(|_| stdout.flush())
.map_err(|e| VmError::Runtime(format!("DAP write: {e}")))?;
Ok(rx)
}
fn await_reply(
&self,
rx: Receiver<DapHostCallReply>,
capability: &str,
operation: &str,
) -> Result<DapHostCallReply, VmError> {
rx.recv_timeout(REVERSE_REQUEST_TIMEOUT).map_err(|_| {
VmError::Thrown(VmValue::String(std::rc::Rc::from(format!(
"harnHostCall timed out after {}s ({capability}.{operation})",
REVERSE_REQUEST_TIMEOUT.as_secs()
))))
})
}
}
impl HostCallBridge for DapHostBridge {
fn dispatch(
&self,
capability: &str,
operation: &str,
params: &BTreeMap<String, VmValue>,
) -> Result<Option<VmValue>, VmError> {
let params_json = vm_dict_to_json(params);
let rx = self.send_reverse_request(capability, operation, params_json)?;
let reply = self.await_reply(rx, capability, operation)?;
if !reply.success {
let detail = reply
.message
.or_else(|| reply.body.as_ref().map(|v| v.to_string()))
.unwrap_or_else(|| format!("{capability}.{operation} failed"));
return Err(VmError::Thrown(VmValue::String(std::rc::Rc::from(detail))));
}
let value = match reply.body {
Some(Value::Object(mut map)) => match map.remove("value") {
Some(v) => json_to_vm_value(v),
None => json_to_vm_value(Value::Object(map)),
},
Some(other) => json_to_vm_value(other),
None => VmValue::Nil,
};
Ok(Some(value))
}
}
pub fn deliver_reply(pending: &PendingMap, request_seq: i64, reply: DapHostCallReply) {
let tx = {
let mut guard = match pending.lock() {
Ok(g) => g,
Err(_) => return,
};
guard.remove(&request_seq)
};
if let Some(tx) = tx {
let _ = tx.send(reply);
}
}
fn vm_dict_to_json(params: &BTreeMap<String, VmValue>) -> Value {
let mut map = serde_json::Map::with_capacity(params.len());
for (k, v) in params.iter() {
map.insert(k.clone(), vm_value_to_json(v));
}
Value::Object(map)
}
fn vm_value_to_json(value: &VmValue) -> Value {
match value {
VmValue::Nil => Value::Null,
VmValue::Bool(b) => Value::Bool(*b),
VmValue::Int(i) => Value::Number((*i).into()),
VmValue::Float(f) => serde_json::Number::from_f64(*f)
.map(Value::Number)
.unwrap_or(Value::Null),
VmValue::String(s) => Value::String(s.to_string()),
VmValue::List(items) => Value::Array(items.iter().map(vm_value_to_json).collect()),
VmValue::Dict(map) => {
let mut obj = serde_json::Map::with_capacity(map.len());
for (k, v) in map.iter() {
obj.insert(k.clone(), vm_value_to_json(v));
}
Value::Object(obj)
}
other => Value::String(other.display().to_string()),
}
}
fn json_to_vm_value(value: Value) -> VmValue {
match value {
Value::Null => VmValue::Nil,
Value::Bool(b) => VmValue::Bool(b),
Value::Number(n) => {
if let Some(i) = n.as_i64() {
VmValue::Int(i)
} else if let Some(f) = n.as_f64() {
VmValue::Float(f)
} else {
VmValue::Nil
}
}
Value::String(s) => VmValue::String(std::rc::Rc::from(s)),
Value::Array(arr) => VmValue::List(std::rc::Rc::new(
arr.into_iter().map(json_to_vm_value).collect(),
)),
Value::Object(obj) => {
let map: BTreeMap<String, VmValue> = obj
.into_iter()
.map(|(k, v)| (k, json_to_vm_value(v)))
.collect();
VmValue::Dict(std::rc::Rc::new(map))
}
}
}
#[allow(dead_code)]
pub fn write_dap_response(
stdout: &Arc<Mutex<Box<dyn Write + Send>>>,
response: &DapResponse,
) -> std::io::Result<()> {
let body = serde_json::to_string(response)
.map_err(|e| std::io::Error::other(format!("encode: {e}")))?;
let header = format!("Content-Length: {}\r\n\r\n", body.len());
let mut guard = stdout
.lock()
.map_err(|_| std::io::Error::other("stdout mutex poisoned"))?;
guard.write_all(header.as_bytes())?;
guard.write_all(body.as_bytes())?;
guard.flush()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
struct SharedWriter(Arc<Mutex<Vec<u8>>>);
impl Write for SharedWriter {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn parse_lsp_frame(bytes: &[u8]) -> serde_json::Value {
let body_start = bytes
.windows(4)
.position(|w| w == b"\r\n\r\n")
.expect("LSP framing terminator")
+ 4;
serde_json::from_slice(&bytes[body_start..]).expect("valid JSON body")
}
fn rig() -> (DapHostBridge, Arc<Mutex<Vec<u8>>>, PendingMap) {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let stdout: Arc<Mutex<Box<dyn Write + Send>>> =
Arc::new(Mutex::new(Box::new(SharedWriter(Arc::clone(&buf)))));
let pending: PendingMap = Arc::new(Mutex::new(BTreeMap::new()));
let seq = Arc::new(AtomicI64::new(1));
let bridge = DapHostBridge::new(seq, stdout, Arc::clone(&pending));
(bridge, buf, pending)
}
fn await_pending_seq(pending: &PendingMap) -> i64 {
for _ in 0..200 {
{
let guard = pending.lock().unwrap();
if let Some(&k) = guard.keys().next() {
return k;
}
}
thread::sleep(Duration::from_millis(5));
}
panic!("bridge never registered a pending reverse request");
}
fn spawn_replier(pending: PendingMap, reply: DapHostCallReply) -> thread::JoinHandle<()> {
thread::spawn(move || {
let req_seq = await_pending_seq(&pending);
deliver_reply(&pending, req_seq, reply);
})
}
#[test]
fn dispatch_emits_reverse_request_and_unwraps_value_body() {
let (bridge, buf, pending) = rig();
let helper = spawn_replier(
Arc::clone(&pending),
DapHostCallReply {
success: true,
body: Some(serde_json::json!({"value": "hello"})),
message: None,
},
);
let mut params = BTreeMap::new();
params.insert("path".into(), VmValue::String(std::rc::Rc::from("foo")));
let result = bridge
.dispatch("workspace", "read_text", ¶ms)
.expect("dispatch ok")
.expect("Some");
helper.join().expect("helper panicked");
match result {
VmValue::String(s) => assert_eq!(&*s, "hello"),
other => panic!("expected String, got {other:?}"),
}
let bytes = buf.lock().unwrap().clone();
let frame = parse_lsp_frame(&bytes);
assert_eq!(frame["type"], "request");
assert_eq!(frame["command"], "harnHostCall");
assert_eq!(frame["arguments"]["capability"], "workspace");
assert_eq!(frame["arguments"]["operation"], "read_text");
assert_eq!(frame["arguments"]["params"]["path"], "foo");
}
#[test]
fn dispatch_failure_throws_with_message() {
let (bridge, _buf, pending) = rig();
let helper = spawn_replier(
Arc::clone(&pending),
DapHostCallReply {
success: false,
body: None,
message: Some("not implemented".to_string()),
},
);
let result = bridge.dispatch("workspace", "missing_op", &BTreeMap::new());
helper.join().expect("helper panicked");
match result {
Err(VmError::Thrown(VmValue::String(s))) => {
assert!(s.contains("not implemented"), "unexpected error: {s}");
}
other => panic!("expected Thrown('not implemented'), got {other:?}"),
}
}
#[test]
fn dispatch_returns_whole_body_when_value_key_missing() {
let (bridge, _buf, pending) = rig();
let helper = spawn_replier(
Arc::clone(&pending),
DapHostCallReply {
success: true,
body: Some(serde_json::json!({"roots": ["/tmp/a"]})),
message: None,
},
);
let result = bridge
.dispatch("session", "active_roots", &BTreeMap::new())
.expect("dispatch ok")
.expect("Some");
helper.join().expect("helper panicked");
match result {
VmValue::Dict(map) => {
let roots = map.get("roots").expect("roots key");
match roots {
VmValue::List(items) => assert_eq!(items.len(), 1),
other => panic!("expected list, got {other:?}"),
}
}
other => panic!("expected Dict, got {other:?}"),
}
}
}