1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
use bytes::Bytes; use futures::{join, pin_mut, select, Future, FutureExt, Stream}; use hopscotch::{self, ArcK}; use serde::Deserialize; use serde_json::Value; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; use uuid::Uuid; pub mod content; pub mod query; pub mod subscription; pub use content::Content; pub use query::Queries; pub use subscription::Subscribers; pub use subscription::{Event, Subscription}; /// A proxy for a page in the browser, encapsulating all the state and /// connections necessary to set its content, listen for events, and evaluate /// JavaScript within it. At any given time, a `Page` object may be connected to /// zero, one, or more actual browser windows, each of which will exactly mirror /// whatever is being done to the `Page` object. /// /// To actually connect a `Page` to a real browser window, this library must be /// used in concert with a web server to serve JavaScript that communicates with /// the server to consume its streams of `Command`s and send it back browser /// events. None of this functionality is contained in this crate, but for ease /// of description, we will refer to the abstractions provided by a `Page` in /// terms of its interactions with a real browser, even though this must be /// enabled elsewhere (canonically, in the Myxine server). /// /// A `Page` can correspond to dynamic or static content, and can be changed /// between those two at will, though changing a page from static content to /// dynamic content will necessitate a manual browser refresh to be noticeable, /// since static content is served without any "magic" to make it automatically /// change. #[derive(Debug)] pub struct Page { /// The current content of the page, including connections to any browser /// windows viewing that page. content: Mutex<Content>, /// The current subscribers to events in the page, including one-off pending /// requests for a particular event and persistent streams of many events. subscribers: Mutex<Subscribers>, /// The current pending JavaScript queries to the page. queries: Mutex<Queries<(String, bool), Result<Value, String>>>, /// The current buffer of events in the page, paired with the maximum buffer /// size limiter. events: RwLock<(BufferParams, EventBuffer)>, // TODO: preload events and use u16 tags } /// The event buffer stores recent events, and because it's implemented as a /// hopscotch queue, allows quickly querying things like "next event with one of /// these tags, after this index." type EventBuffer = hopscotch::Queue<String, Arc<Event>, ArcK>; /// The parameters defining the memory behavior of the event buffer. #[derive(Debug, Clone)] struct BufferParams { /// How many events does the buffer hold when full? size: usize, /// When the buffer is in the midst of being downsized, how many old events /// does it drop for each new event it receives? deallocation_rate: usize, } /// The possible responses from a page: either an event happened, or the result /// of evaluating an expression was sent back. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase", tag = "type")] pub enum Response { /// The browser sent back an event that occurred. Event(Event), /// The browser sent back the result of some JavaScript evaluation. EvalResult { /// The unique id of the JavaScript evaluation query. id: Uuid, /// The result of the query: either some successful JSON serialization /// of a JavaScript value, or the string representation of some /// JavaScript error. result: Result<Value, String>, }, } /// The ways in which a page can be refreshed. #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] pub enum RefreshMode { /// Reload the entire page via `window.location.reload()`. FullReload, /// Set the body of the page via `document.body = ...` (mainly to be used /// for debugging purposes). SetBody, /// The default: diff the update against the current page contents to update /// only the parts of the DOM which require it. Diff, } impl Page { /// Make a new empty (dynamic) page with a given internal event buffer size. /// This can be altered later by means of the `set_buffer_size` method. pub fn new(buffer_size: usize) -> Page { let params = BufferParams { size: buffer_size, deallocation_rate: 2, }; Page { content: Mutex::new(Content::new()), subscribers: Mutex::new(Subscribers::new()), queries: Mutex::new(Queries::new()), events: RwLock::new((params, hopscotch::Queue::with_capacity(buffer_size))), } } /// Test if this page is empty, where "empty" means that it is dynamic, with /// an empty title, empty body, and no subscribers waiting on its page /// events. pub async fn is_empty(&self) -> bool { let (content, subscribers, queries) = join!( async { self.content.lock().await }, async { self.subscribers.lock().await }, async { self.queries.lock().await }, ); content.is_empty() && subscribers.is_empty() && queries.is_empty() } /// Get the stream of commands to this page, starting at the moment this /// function was called. These commands provide all the dynamic information /// necessary for a consumer of this stream to reconstruct the exact state /// of the page at any given moment. If the page is static, returns `None`. pub async fn commands(&self) -> Option<impl Stream<Item = content::Command>> { let mut content = self.content.lock().await; // Get the command stream (must come first here so we make sure to // capture these commands below). let commands = content.commands(); // Re-send all pending evaluations to the page, in case it was static or // non-existent before. match &mut *content { Content::Dynamic { other_commands, .. } => { for (id, (script, statement_mode)) in self.queries.lock().await.pending() { let command = content::Command::Evaluate { script: script.clone(), statement_mode: *statement_mode, id: *id, }; other_commands.send(command).unwrap_or(0); } } Content::Static { .. } => { // Don't do anything if the content is static, because there's // no way to evaluate pending JavaScript on a static page } } commands } /// If the page is static, return a pair of its `Content-Type` (if it has /// one), and its static content. If the page is dynamic, returns `None`. pub async fn static_content(&self) -> Option<(Option<String>, Bytes)> { match &*self.content.lock().await { Content::Dynamic { .. } => None, Content::Static { content_type, raw_contents, } => Some((content_type.clone(), raw_contents.clone())), } } /// Get a stream of events which match the specified [`Subscription`], /// starting at the next event after the moment this function is called. pub async fn events(&self, subscription: Subscription) -> impl Stream<Item = Arc<Event>> { let mut subscribers = self.subscribers.lock().await; subscribers.add_subscriber(subscription) } /// Get a specific event, at or after a particular moment in time, given a /// subscription specification that it must match. If the event is available /// in the buffer, this returns immediately, otherwise it may block until /// the event is available. If the request is for a moment which lies /// *before* the buffer, returns `Err(u64)`, to indicate the canonical /// moment for the event requested. The user-agent is responsible for /// retrying a request redirected to this moment. pub async fn event_after( &self, subscription: Subscription, moment: u64, ) -> Result<(u64, Arc<Event>), u64> { let lagged = { // Scope for ensuring we drop events read-lock let events = &self.events.read().await.1; // Determine if the event was lagging the buffer let earliest_index = if let Some(earliest) = events.earliest() { earliest.index() } else { events.next_index() }; let lagged = moment < earliest_index; match &subscription { Subscription::Universal => { if lagged { // Immediately report lag: since any event will do, the // place to retry (after the user-agent re-issues the // request) should be the earliest index represented. return Err(earliest_index); } else if let Some(found) = events.get(moment) { // If no lag, and there is an event to send, send back // the body we desire, and its found index. return Ok((moment, found.value().clone())); } else { // Block until we get a matching event (see below). } } Subscription::Specific(tags) => { if let Some(found) = events.after(moment, tags) { if lagged { // Immediately report lag: since it was found, the // place to retry (after the user-agent re-issues // the request) should be this very same index. return Err(found.index()); } else { // If no lag, send back the body we desire, and its // found index. return Ok((found.index(), found.value().clone())); } } else { // Block until we get a matching event (see below). } } } // This computed value will be used below to determine whether to // report that lag occurred if there was both (1) lag, and (2) no // matching event yet. lagged }; // If we couldn't find it in the buffered events, then wait for it: let result = { // Scope for ensuring we drop subscriber lock before awaiting the // result of the subscription let mut subscribers = self.subscribers.lock().await; subscribers.add_one_off(subscription, moment, lagged) }; result.await } /// Get the next event for a particular subscription, bypassing the buffer /// of pre-existing events and blocking until a new event arrives after the /// time this function was called. pub async fn next_event(&self, subscription: Subscription) -> (u64, Arc<Event>) { let result = { // Scope for ensuring we drop subscriber lock before awaiting the // result of the subscription let mut subscribers = self.subscribers.lock().await; subscribers.add_one_off(subscription, 0, false) }; result.await.expect("Page::next_event can't lag") } /// Send an event to all subscribers to events on this page which are /// waiting on an event of this kind. pub async fn send_event(&self, event: Event) { let event = Arc::new(event); let ( BufferParams { size: max_buffer_size, deallocation_rate, }, ref mut events, ) = &mut *self.events.write().await; self.subscribers .lock() .await .send_event(events.next_index(), event.clone()); events.push(event.event.clone(), event); // Trim the event buffer by a bounded amount if it's larger than the max // length. The bound prevents a sudden large decrease in max buffer // length from causing a long lag -- instead, the pruning is amortized // across many events, at the cost of not immediately freeing the // memory. for i in 0..*deallocation_rate { if events.len() > *max_buffer_size { events.pop(); } else { // Shrink to fit only if we popped more than once: that's to // say, we only shrink the underlying buffer if we've actually // decreased its size in this method. if i > 1 && events.len() == *max_buffer_size { events.shrink_to_fit(); } break; } } } /// Tell all clients to change the title and body, if necessary. This /// converts the page into a dynamic page, overwriting any static content /// that previously existed, if any. pub async fn set_content( &self, new_title: impl Into<String>, new_body: impl Into<String>, refresh: RefreshMode, ) { self.content.lock().await.set(new_title, new_body, refresh); } /// Set the contents of the page to be a static raw set of bytes with no /// self-refreshing functionality. All clients will be told to refresh their /// page to load the new static content (which will not be able to update /// itself until a client refreshes their page again). pub async fn set_static(&self, content_type: Option<String>, raw_contents: Bytes) { let mut content = self.content.lock().await; content.set_static(content_type, raw_contents) } /// Clear the page entirely, removing all subscribers and resetting the page /// title and body to empty. pub async fn clear(&self) { let subscribers = &mut *self.subscribers.lock().await; *subscribers = Subscribers::new(); let content = &mut *self.content.lock().await; content.set("", "", RefreshMode::Diff); } /// Tell the page, if it is dynamic, to refresh its content in full from the /// server. #[allow(dead_code)] pub async fn refresh(&self) { self.content.lock().await.refresh(RefreshMode::FullReload) } /// Tell the page to evaluate a given piece of JavaScript, as either an /// expression or a statement, with an optional explicit timeout (defaults /// to the timeout duration specified when this page was constructed). pub async fn evaluate( &self, expression: &str, statement_mode: bool, abort: impl Future<Output = ()>, ) -> Option<Result<Value, String>> { let script = expression.to_string(); let (id, fut) = self .queries .lock() .await .request((script.clone(), statement_mode)); match *self.content.lock().await { Content::Dynamic { ref mut other_commands, .. } => { let command = content::Command::Evaluate { script, statement_mode, id, }; other_commands.send(command).unwrap_or(0); } Content::Static { .. } => { // Do nothing -- if the page is ever reloaded as dynamic, this // query will be re-sent to it if it hasn't yet been answered } } let abort = abort.fuse(); let fut = fut.fuse(); pin_mut!(fut, abort); select! { result = fut => Some(result.expect("Internal error: query handle dropped before response")), () = abort => { self.queries.lock().await.cancel(id); None } } } /// Notify waiting clients of the result to some in-page JavaScript /// evaluation they have requested, either with an error or a valid /// response. pub async fn send_eval_result(&self, id: Uuid, result: Result<Value, String>) { let _ = self.queries.lock().await.respond(id, result); } /// Set the size of the page's internal event buffer. This will not actually /// de-allocate memory immediately, but rather will mean that future /// incoming events will incrementally trigger downsizing of the event /// buffer. When the event buffer actually reaches its new target size, one /// single re-allocation will occur to free the used memory. /// /// The `deallocation_rate` parameter determines how aggressively the buffer /// size is changed. A value of 2 means that for every incoming event, 2 old /// events are freed; a value of usize::max_value() means that the buffer is /// instantly resized at the next incoming event; anything in between cause /// incremental behavior, freeing N old events for each 1 new event. A /// deallocation rate of 0 or 1 is not valid, and will cause this function /// to panic. pub async fn set_buffer_size(&self, size: usize, deallocation_rate: usize) { if deallocation_rate >= 2 { self.events.write().await.0 = BufferParams { size, deallocation_rate, }; } else { panic!("Cannot set buffer size with a deallocation rate of < 2"); } } }