phlow_server/
lib.rs

1#![feature(min_specialization)]
2
3use std::collections::HashMap;
4use std::convert::Infallible;
5use std::net::TcpListener;
6use std::sync::Arc;
7use std::thread;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::runtime::Runtime;
12use uuid::Uuid;
13use warp::Filter;
14
15use phlow::{
16    define_extensions, import_extensions, phlow, PhlowObject, PhlowObjectId, PhlowView,
17    PhlowViewSpecificationListingItem,
18};
19use phlow_extensions::CoreExtensions;
20
21mod extensions;
22mod handler;
23
24define_extensions!(PhlowServerExtensions);
25import_extensions!(CoreExtensions, PhlowServerExtensions);
26
27#[derive(Clone, Debug)]
28pub struct PhlowServer(Arc<RwLock<PhlowServerData>>);
29
30#[derive(Debug)]
31struct PhlowServerData {
32    root_object: PhlowObject,
33    objects: HashMap<PhlowObjectId, (PhlowObject, usize)>,
34    session: Uuid,
35    routes: Vec<(String, String)>,
36    server_object_id: PhlowObjectId,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PhlowObjectDescription {
41    id: PhlowObjectId,
42    object_type: String,
43    print_string: String,
44    reference_count: usize,
45    should_auto_release: bool,
46}
47
48#[derive(Debug, Serialize)]
49#[serde(rename_all = "camelCase")]
50pub struct PhlowViewSpecificationDataNode {
51    pub phlow_object: PhlowObjectDescription,
52    pub node_id: PhlowObjectId,
53    pub node_value: Box<dyn PhlowViewSpecificationListingItem>,
54}
55
56impl PhlowObjectDescription {
57    pub fn new(object: &PhlowObject, reference_count: usize) -> Self {
58        Self {
59            id: object.object_id(),
60            object_type: object.phlow_type().type_name().to_string(),
61            print_string: object.to_string(),
62            reference_count,
63            should_auto_release: true,
64        }
65    }
66
67    pub fn without_auto_release(mut self) -> Self {
68        self.should_auto_release = false;
69        self
70    }
71
72    pub fn with_auto_release(mut self) -> Self {
73        self.should_auto_release = true;
74        self
75    }
76}
77
78impl PhlowServer {
79    pub fn new(root_object: PhlowObject) -> Self {
80        let server = Self(Arc::new(RwLock::new(PhlowServerData {
81            root_object: root_object.clone(),
82            objects: Default::default(),
83            session: Uuid::new_v4(),
84            routes: vec![],
85            server_object_id: 0,
86        })));
87
88        let server_phlow_object = phlow!(server.clone());
89        server.0.write().server_object_id = server_phlow_object.object_id();
90
91        server.register_object(root_object);
92        server.register_object(server_phlow_object);
93        server
94    }
95
96    pub fn id(&self) -> PhlowObjectId {
97        self.0.read().server_object_id
98    }
99
100    pub fn session(&self) -> Uuid {
101        self.0.read().session.clone()
102    }
103
104    pub fn add_route(&self, method: &str, new_route: &str) {
105        self.0
106            .write()
107            .routes
108            .push((method.to_string(), new_route.to_string()));
109    }
110
111    pub fn get_routes(&self) -> Vec<(String, String)> {
112        self.0.read().routes.clone()
113    }
114
115    pub fn register_object(&self, object: PhlowObject) -> PhlowObjectDescription {
116        let objects = &mut self.0.write().objects;
117        let mut count = objects
118            .get(&object.object_id())
119            .map_or_else(|| 0, |entry| entry.1);
120
121        count = count + 1;
122        objects.insert(object.object_id(), (object.clone(), count));
123        PhlowObjectDescription::new(&object, count).with_auto_release()
124    }
125
126    pub fn release_object(&self, session: Uuid, object_id: PhlowObjectId) -> Option<PhlowObject> {
127        let mut lock = self.0.write();
128
129        if session != lock.session {
130            return None;
131        }
132
133        let objects = &mut lock.objects;
134        if let Some(entry) = objects.get_mut(&object_id) {
135            let count = entry.1 - 1;
136            if count > 0 {
137                entry.1 = count;
138                None
139            } else {
140                objects.remove(&object_id).map(|entry| entry.0)
141            }
142        } else {
143            None
144        }
145    }
146
147    pub fn root_phlow_views(&self) -> Vec<Box<dyn PhlowView>> {
148        self.0.read().root_object.phlow_views()
149    }
150
151    /// Return descriptions of registered objects.
152    /// Doesn't increase the reference count
153    pub fn inspect_objects(&self) -> Vec<PhlowObjectDescription> {
154        let mut descriptions: Vec<PhlowObjectDescription> = self
155            .0
156            .read()
157            .objects
158            .values()
159            .map(|object| PhlowObjectDescription::new(&object.0, object.1).without_auto_release())
160            .collect();
161
162        descriptions.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
163
164        descriptions
165    }
166
167    pub fn find_object(&self, id: PhlowObjectId) -> Option<PhlowObject> {
168        self.0
169            .read()
170            .objects
171            .get(&id)
172            .map(|object| object.0.clone())
173    }
174
175    /// Return object description for a given object id.
176    /// Increases the reference count
177    pub fn retrieve_object(&self, id: PhlowObjectId) -> Option<PhlowObjectDescription> {
178        let description = self.0.write().objects.get_mut(&id).map(|entry| {
179            let count = entry.1 + 1;
180            entry.1 = count;
181            PhlowObjectDescription::new(&entry.0, entry.1).with_auto_release()
182        });
183
184        description
185    }
186
187    pub fn registered_object_description_by_id_views(
188        &self,
189        id: PhlowObjectId,
190    ) -> Option<Vec<Box<dyn PhlowView>>> {
191        self.find_object(id).map(|object| object.phlow_views())
192    }
193}
194
195fn with_phlow_server(
196    server: PhlowServer,
197) -> impl Filter<Extract = (PhlowServer,), Error = Infallible> + Clone {
198    warp::any().map(move || server.clone())
199}
200
201macro_rules! get_path {
202    ($server:ident, $($pieces:tt)*) => ({
203        $server.add_route("GET", stringify!($($pieces)*));
204        warp::path!($($pieces)*).and(warp::get())
205    });
206}
207
208macro_rules! delete_path {
209    ($server:ident, $($pieces:tt)*) => ({
210        $server.add_route("DELETE", stringify!($($pieces)*));
211        warp::path!($($pieces)*).and(warp::delete())
212    });
213}
214
215pub fn serve(object: PhlowObject) -> thread::JoinHandle<()> {
216    let server = PhlowServer::new(object);
217    let port = get_available_port();
218    if let Some(port) = port {
219        let handle = spawn(server, port);
220        println!("Phlow server running at 127.0.0.1:{}.", port);
221        return handle;
222    }
223    panic!("Failed to find a suitable port")
224}
225
226pub fn spawn(server: PhlowServer, port: u16) -> thread::JoinHandle<()> {
227    let session = get_path!(server, "session")
228        .and(with_phlow_server(server.clone()))
229        .and_then(handler::session);
230
231    let server_id = get_path!(server, "id")
232        .and(with_phlow_server(server.clone()))
233        .and_then(handler::server_id);
234
235    let objects = get_path!(server, "objects")
236        .and(with_phlow_server(server.clone()))
237        .and_then(handler::objects);
238
239    let object = get_path!(server, "objects" / PhlowObjectId)
240        .and(warp::get())
241        .and(with_phlow_server(server.clone()))
242        .and_then(handler::object);
243
244    let object_views = get_path!(server, "objects" / PhlowObjectId / "views")
245        .and(with_phlow_server(server.clone()))
246        .and_then(handler::object_views);
247
248    let object_view = get_path!(server, "objects" / PhlowObjectId / "views" / String)
249        .and(with_phlow_server(server.clone()))
250        .and_then(handler::object_view);
251
252    let object_view_items = get_path!(
253        server,
254        "objects" / PhlowObjectId / "views" / String / "items"
255    )
256    .and(with_phlow_server(server.clone()))
257    .and_then(handler::object_view_items);
258
259    let object_view_sent_item = get_path!(
260        server,
261        "objects" / PhlowObjectId / "views" / String / "send" / PhlowObjectId
262    )
263    .and(with_phlow_server(server.clone()))
264    .and_then(handler::object_view_sent_item);
265
266    let release_object = delete_path!(server, "session" / String / "objects" / PhlowObjectId)
267        .and(with_phlow_server(server.clone()))
268        .and_then(handler::release_object);
269
270    let routes = session
271        .or(server_id)
272        .or(objects)
273        .or(object)
274        .or(release_object)
275        .or(object_views)
276        .or(object_view)
277        .or(object_view_items)
278        .or(object_view_sent_item);
279
280    thread::spawn(move || {
281        let rt = Runtime::new().unwrap();
282        rt.block_on(async move {
283            warp::serve(routes).run(([127, 0, 0, 1], port)).await;
284            ()
285        });
286        ()
287    })
288}
289
290fn get_available_port() -> Option<u16> {
291    (51507..65535).find(|port| port_is_available(*port))
292}
293
294fn port_is_available(port: u16) -> bool {
295    match TcpListener::bind(("127.0.0.1", port)) {
296        Ok(_) => true,
297        Err(_) => false,
298    }
299}