daicon_web/
lib.rs

1use std::{cell::RefCell, collections::HashMap, ops::Range, rc::Rc};
2
3use anyhow::{Context as _, Error};
4use daicon::protocol::{FileAction, FileMessage, FileRead, ReadResult};
5use js_sys::{ArrayBuffer, Uint8Array};
6use stewart::{Actor, Context, Options, Sender, State, World};
7use tracing::{event, instrument, Level};
8use uuid::Uuid;
9use wasm_bindgen::JsCast;
10use wasm_bindgen_futures::{spawn_local, JsFuture};
11use web_sys::{Headers, Request, RequestInit, RequestMode, Response};
12
13#[instrument("SystemFile", skip_all)]
14pub fn open_fetch_file(
15    ctx: &mut Context,
16    url: String,
17    hnd: WorldHandle,
18) -> Result<Sender<FileMessage>, Error> {
19    let (mut ctx, sender) = ctx.create(Options::default())?;
20
21    let actor = FetchFile {
22        hnd,
23        sender: sender.clone(),
24        url,
25
26        pending: HashMap::new(),
27    };
28    ctx.start(actor)?;
29
30    Ok(sender.map(MessageImpl::Message))
31}
32
33struct FetchFile {
34    hnd: WorldHandle,
35    sender: Sender<MessageImpl>,
36    url: String,
37
38    pending: HashMap<Uuid, FileRead>,
39}
40
41impl Actor for FetchFile {
42    type Message = MessageImpl;
43
44    #[instrument("SystemFile", skip_all)]
45    fn process(&mut self, ctx: &mut Context, state: &mut State<Self>) -> Result<(), Error> {
46        while let Some(message) = state.next() {
47            match message {
48                MessageImpl::Message(message) => {
49                    self.on_message(message);
50                }
51                MessageImpl::FetchResult { id, data } => {
52                    self.on_fetch_result(ctx, id, data)?;
53                }
54            }
55        }
56
57        Ok(())
58    }
59}
60
61impl FetchFile {
62    fn on_message(&mut self, message: FileMessage) {
63        match message.action {
64            FileAction::Read(action) => {
65                self.on_read(message.id, action);
66            }
67            FileAction::Write { .. } => {
68                // TODO: Report back invalid operation
69            }
70        }
71    }
72
73    fn on_read(&mut self, id: Uuid, action: FileRead) {
74        event!(Level::INFO, "received read");
75
76        let range = action.offset..(action.offset + action.size);
77        self.pending.insert(id, action);
78
79        // TODO: Batch fetches, we can do multiple range requests at once
80        spawn_local(do_fetch(
81            self.hnd.clone(),
82            self.sender.clone(),
83            id,
84            self.url.clone(),
85            range,
86        ));
87    }
88
89    fn on_fetch_result(&mut self, ctx: &mut Context, id: Uuid, data: Vec<u8>) -> Result<(), Error> {
90        event!(Level::INFO, "received fetch result");
91
92        // TODO: Validate the actual result, we may not have gotten what we asked for, for example
93        //  a 200 response means we got the entire file instead of just the ranges.
94
95        let pending = self.pending.remove(&id).context("failed to find pending")?;
96
97        let message = ReadResult {
98            id,
99            offset: pending.offset,
100            data,
101        };
102        pending.on_result.send(ctx, message);
103
104        Ok(())
105    }
106}
107
108enum MessageImpl {
109    Message(FileMessage),
110    FetchResult { id: Uuid, data: Vec<u8> },
111}
112
113async fn do_fetch(
114    hnd: WorldHandle,
115    sender: Sender<MessageImpl>,
116    id: Uuid,
117    url: String,
118    range: Range<u64>,
119) {
120    event!(Level::INFO, "fetching data");
121
122    let window = web_sys::window().unwrap();
123
124    // Perform fetch
125    let headers = Headers::new().unwrap();
126    let range_header = format!("bytes={}-{}", range.start, range.end - 1);
127    event!(Level::TRACE, range = range_header);
128    headers.append("Range", &range_header).unwrap();
129
130    let mut opts = RequestInit::new();
131    opts.method("GET");
132    opts.mode(RequestMode::Cors);
133    opts.headers(&headers);
134
135    let request = Request::new_with_str_and_init(&url, &opts).unwrap();
136    let response = window.fetch_with_request(&request);
137
138    // Await the response
139    let response = JsFuture::from(response).await.unwrap();
140    let response: Response = response.dyn_into().unwrap();
141
142    // Await all the response data
143    let data = response.array_buffer().unwrap();
144    let data = JsFuture::from(data).await.unwrap();
145    let data: ArrayBuffer = data.dyn_into().unwrap();
146    let data = Uint8Array::new(&data).to_vec();
147
148    // Send the data back
149    let mut world = hnd.borrow_mut();
150    let mut ctx = world.root();
151    sender.send(&mut ctx, MessageImpl::FetchResult { id, data });
152    world.run_until_idle().unwrap();
153}
154
155/// TODO: Replace this with a more thought out executor abstraction.
156pub type WorldHandle = Rc<RefCell<World>>;