use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;
use pylon_kernel::{AppManifest, ManifestEntity, ManifestField};
use pylon_runtime::Runtime;
use tungstenite::client::IntoClientRequest;
use tungstenite::{client, Message};
fn test_manifest() -> AppManifest {
AppManifest {
manifest_version: 1,
name: "e2e-ws".into(),
version: "0.1.0".into(),
entities: vec![ManifestEntity {
name: "Todo".into(),
fields: vec![
ManifestField {
name: "title".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "done".into(),
field_type: "bool".into(),
optional: false,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
search: None,
crdt: true,
}],
routes: vec![],
queries: vec![],
actions: vec![],
policies: vec![],
auth: Default::default(),
}
}
fn available_port() -> u16 {
static NEXT: AtomicU16 = AtomicU16::new(41_000);
for _ in 0..200 {
let base = NEXT.fetch_add(4, Ordering::Relaxed);
let ok = (0..4)
.all(|off| std::net::TcpListener::bind(format!("127.0.0.1:{}", base + off)).is_ok());
if ok {
return base;
}
}
panic!("no free 4-port block");
}
fn start_server() -> (u16, Arc<Runtime>) {
let port = available_port();
let manifest = test_manifest();
unsafe {
std::env::set_var("PYLON_DEV_MODE", "1");
}
let rt = Arc::new(Runtime::in_memory(manifest).unwrap());
let rt2 = Arc::clone(&rt);
std::thread::spawn(move || {
let _ = pylon_runtime::server::start(rt2, port);
});
for _ in 0..100 {
if TcpStream::connect(format!("127.0.0.1:{port}")).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
for _ in 0..100 {
if TcpStream::connect(format!("127.0.0.1:{}", port + 1)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
(port, rt)
}
fn http_request(method: &str, url: &str, body: Option<&str>) -> (u16, String) {
let host = url.strip_prefix("http://").unwrap_or(url);
let (host_port, path) = match host.find('/') {
Some(i) => (&host[..i], &host[i..]),
None => (host, "/"),
};
let body_str = body.unwrap_or("");
let request = format!(
"{method} {path} HTTP/1.1\r\nHost: {host_port}\r\nOrigin: http://{host_port}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body_str}",
body_str.len()
);
let mut stream = TcpStream::connect(host_port).expect("connect");
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
stream.write_all(request.as_bytes()).expect("write");
let mut response = Vec::new();
let _ = stream.read_to_end(&mut response);
let text = String::from_utf8_lossy(&response).to_string();
let status = text
.lines()
.next()
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(0);
let body = match text.find("\r\n\r\n") {
Some(i) => text[i + 4..].to_string(),
None => String::new(),
};
(status, body)
}
#[test]
fn http_insert_appears_in_sync_pull() {
let (port, _rt) = start_server();
let base = format!("http://127.0.0.1:{port}");
let (status, body) = http_request(
"POST",
&format!("{base}/api/entities/Todo"),
Some(r#"{"title": "e2e-smoke", "done": false}"#),
);
assert_eq!(status, 201, "insert: {body}");
let (status, body) = http_request("GET", &format!("{base}/api/sync/pull?since=0"), None);
assert_eq!(status, 200, "sync pull: {body}");
let resp: serde_json::Value = serde_json::from_str(&body).unwrap();
let changes = resp["changes"].as_array().expect("changes array");
let todo_insert = changes
.iter()
.find(|c| c["entity"] == "Todo" && c["kind"] == "insert");
assert!(
todo_insert.is_some(),
"sync pull must surface the Todo insert: {resp}"
);
}
#[test]
fn ws_rejects_unauthenticated() {
let (port, _rt) = start_server();
let ws_url = format!("ws://127.0.0.1:{}/", port + 1);
let req = ws_url.into_client_request().expect("ws request");
let connect_result = client::connect(req);
match connect_result {
Ok((mut ws, _)) => {
if let tungstenite::stream::MaybeTlsStream::Plain(ref s) = ws.get_ref() {
s.set_read_timeout(Some(Duration::from_secs(2))).ok();
}
match ws.read() {
Ok(Message::Close(_)) => { }
Ok(other) => panic!("expected Close, got {other:?}"),
Err(_) => { }
}
}
Err(_) => {
}
}
}