1#![feature(min_specialization)]
2
3use std::collections::HashMap;
4use std::convert::Infallible;
5use std::net::TcpListener;
6use std::sync::Arc;
7use std::thread;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::runtime::Runtime;
12use uuid::Uuid;
13use warp::Filter;
14
15use phlow::{
16 define_extensions, import_extensions, phlow, PhlowObject, PhlowObjectId, PhlowView,
17 PhlowViewSpecificationListingItem,
18};
19use phlow_extensions::CoreExtensions;
20
21mod extensions;
22mod handler;
23
24define_extensions!(PhlowServerExtensions);
25import_extensions!(CoreExtensions, PhlowServerExtensions);
26
27#[derive(Clone, Debug)]
28pub struct PhlowServer(Arc<RwLock<PhlowServerData>>);
29
30#[derive(Debug)]
31struct PhlowServerData {
32 root_object: PhlowObject,
33 objects: HashMap<PhlowObjectId, (PhlowObject, usize)>,
34 session: Uuid,
35 routes: Vec<(String, String)>,
36 server_object_id: PhlowObjectId,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PhlowObjectDescription {
41 id: PhlowObjectId,
42 object_type: String,
43 print_string: String,
44 reference_count: usize,
45 should_auto_release: bool,
46}
47
48#[derive(Debug, Serialize)]
49#[serde(rename_all = "camelCase")]
50pub struct PhlowViewSpecificationDataNode {
51 pub phlow_object: PhlowObjectDescription,
52 pub node_id: PhlowObjectId,
53 pub node_value: Box<dyn PhlowViewSpecificationListingItem>,
54}
55
56impl PhlowObjectDescription {
57 pub fn new(object: &PhlowObject, reference_count: usize) -> Self {
58 Self {
59 id: object.object_id(),
60 object_type: object.phlow_type().type_name().to_string(),
61 print_string: object.to_string(),
62 reference_count,
63 should_auto_release: true,
64 }
65 }
66
67 pub fn without_auto_release(mut self) -> Self {
68 self.should_auto_release = false;
69 self
70 }
71
72 pub fn with_auto_release(mut self) -> Self {
73 self.should_auto_release = true;
74 self
75 }
76}
77
78impl PhlowServer {
79 pub fn new(root_object: PhlowObject) -> Self {
80 let server = Self(Arc::new(RwLock::new(PhlowServerData {
81 root_object: root_object.clone(),
82 objects: Default::default(),
83 session: Uuid::new_v4(),
84 routes: vec![],
85 server_object_id: 0,
86 })));
87
88 let server_phlow_object = phlow!(server.clone());
89 server.0.write().server_object_id = server_phlow_object.object_id();
90
91 server.register_object(root_object);
92 server.register_object(server_phlow_object);
93 server
94 }
95
96 pub fn id(&self) -> PhlowObjectId {
97 self.0.read().server_object_id
98 }
99
100 pub fn session(&self) -> Uuid {
101 self.0.read().session.clone()
102 }
103
104 pub fn add_route(&self, method: &str, new_route: &str) {
105 self.0
106 .write()
107 .routes
108 .push((method.to_string(), new_route.to_string()));
109 }
110
111 pub fn get_routes(&self) -> Vec<(String, String)> {
112 self.0.read().routes.clone()
113 }
114
115 pub fn register_object(&self, object: PhlowObject) -> PhlowObjectDescription {
116 let objects = &mut self.0.write().objects;
117 let mut count = objects
118 .get(&object.object_id())
119 .map_or_else(|| 0, |entry| entry.1);
120
121 count = count + 1;
122 objects.insert(object.object_id(), (object.clone(), count));
123 PhlowObjectDescription::new(&object, count).with_auto_release()
124 }
125
126 pub fn release_object(&self, session: Uuid, object_id: PhlowObjectId) -> Option<PhlowObject> {
127 let mut lock = self.0.write();
128
129 if session != lock.session {
130 return None;
131 }
132
133 let objects = &mut lock.objects;
134 if let Some(entry) = objects.get_mut(&object_id) {
135 let count = entry.1 - 1;
136 if count > 0 {
137 entry.1 = count;
138 None
139 } else {
140 objects.remove(&object_id).map(|entry| entry.0)
141 }
142 } else {
143 None
144 }
145 }
146
147 pub fn root_phlow_views(&self) -> Vec<Box<dyn PhlowView>> {
148 self.0.read().root_object.phlow_views()
149 }
150
151 pub fn inspect_objects(&self) -> Vec<PhlowObjectDescription> {
154 let mut descriptions: Vec<PhlowObjectDescription> = self
155 .0
156 .read()
157 .objects
158 .values()
159 .map(|object| PhlowObjectDescription::new(&object.0, object.1).without_auto_release())
160 .collect();
161
162 descriptions.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
163
164 descriptions
165 }
166
167 pub fn find_object(&self, id: PhlowObjectId) -> Option<PhlowObject> {
168 self.0
169 .read()
170 .objects
171 .get(&id)
172 .map(|object| object.0.clone())
173 }
174
175 pub fn retrieve_object(&self, id: PhlowObjectId) -> Option<PhlowObjectDescription> {
178 let description = self.0.write().objects.get_mut(&id).map(|entry| {
179 let count = entry.1 + 1;
180 entry.1 = count;
181 PhlowObjectDescription::new(&entry.0, entry.1).with_auto_release()
182 });
183
184 description
185 }
186
187 pub fn registered_object_description_by_id_views(
188 &self,
189 id: PhlowObjectId,
190 ) -> Option<Vec<Box<dyn PhlowView>>> {
191 self.find_object(id).map(|object| object.phlow_views())
192 }
193}
194
195fn with_phlow_server(
196 server: PhlowServer,
197) -> impl Filter<Extract = (PhlowServer,), Error = Infallible> + Clone {
198 warp::any().map(move || server.clone())
199}
200
201macro_rules! get_path {
202 ($server:ident, $($pieces:tt)*) => ({
203 $server.add_route("GET", stringify!($($pieces)*));
204 warp::path!($($pieces)*).and(warp::get())
205 });
206}
207
208macro_rules! delete_path {
209 ($server:ident, $($pieces:tt)*) => ({
210 $server.add_route("DELETE", stringify!($($pieces)*));
211 warp::path!($($pieces)*).and(warp::delete())
212 });
213}
214
215pub fn serve(object: PhlowObject) -> thread::JoinHandle<()> {
216 let server = PhlowServer::new(object);
217 let port = get_available_port();
218 if let Some(port) = port {
219 let handle = spawn(server, port);
220 println!("Phlow server running at 127.0.0.1:{}.", port);
221 return handle;
222 }
223 panic!("Failed to find a suitable port")
224}
225
226pub fn spawn(server: PhlowServer, port: u16) -> thread::JoinHandle<()> {
227 let session = get_path!(server, "session")
228 .and(with_phlow_server(server.clone()))
229 .and_then(handler::session);
230
231 let server_id = get_path!(server, "id")
232 .and(with_phlow_server(server.clone()))
233 .and_then(handler::server_id);
234
235 let objects = get_path!(server, "objects")
236 .and(with_phlow_server(server.clone()))
237 .and_then(handler::objects);
238
239 let object = get_path!(server, "objects" / PhlowObjectId)
240 .and(warp::get())
241 .and(with_phlow_server(server.clone()))
242 .and_then(handler::object);
243
244 let object_views = get_path!(server, "objects" / PhlowObjectId / "views")
245 .and(with_phlow_server(server.clone()))
246 .and_then(handler::object_views);
247
248 let object_view = get_path!(server, "objects" / PhlowObjectId / "views" / String)
249 .and(with_phlow_server(server.clone()))
250 .and_then(handler::object_view);
251
252 let object_view_items = get_path!(
253 server,
254 "objects" / PhlowObjectId / "views" / String / "items"
255 )
256 .and(with_phlow_server(server.clone()))
257 .and_then(handler::object_view_items);
258
259 let object_view_sent_item = get_path!(
260 server,
261 "objects" / PhlowObjectId / "views" / String / "send" / PhlowObjectId
262 )
263 .and(with_phlow_server(server.clone()))
264 .and_then(handler::object_view_sent_item);
265
266 let release_object = delete_path!(server, "session" / String / "objects" / PhlowObjectId)
267 .and(with_phlow_server(server.clone()))
268 .and_then(handler::release_object);
269
270 let routes = session
271 .or(server_id)
272 .or(objects)
273 .or(object)
274 .or(release_object)
275 .or(object_views)
276 .or(object_view)
277 .or(object_view_items)
278 .or(object_view_sent_item);
279
280 thread::spawn(move || {
281 let rt = Runtime::new().unwrap();
282 rt.block_on(async move {
283 warp::serve(routes).run(([127, 0, 0, 1], port)).await;
284 ()
285 });
286 ()
287 })
288}
289
290fn get_available_port() -> Option<u16> {
291 (51507..65535).find(|port| port_is_available(*port))
292}
293
294fn port_is_available(port: u16) -> bool {
295 match TcpListener::bind(("127.0.0.1", port)) {
296 Ok(_) => true,
297 Err(_) => false,
298 }
299}