1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
use crate::with_glean;
use glean_core::upload::PingUploadTask;
pub use glean_core::upload::{PingRequest, UploadResult};
pub use http_uploader::*;
mod http_uploader;
pub trait PingUploader: std::fmt::Debug + Send + Sync {
fn upload(&self, url: String, body: Vec<u8>, headers: Vec<(String, String)>) -> UploadResult;
}
#[derive(Debug)]
pub(crate) struct UploadManager {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
server_endpoint: String,
uploader: Box<dyn PingUploader + 'static>,
thread_running: AtomicBool,
}
impl UploadManager {
pub(crate) fn new(
server_endpoint: String,
new_uploader: Box<dyn PingUploader + 'static>,
) -> Self {
Self {
inner: Arc::new(Inner {
server_endpoint,
uploader: new_uploader,
thread_running: AtomicBool::new(false),
}),
}
}
pub(crate) fn trigger_upload(&self) {
if self.inner.thread_running.load(Ordering::SeqCst) {
log::debug!("The upload task is already running.");
return;
}
let inner = Arc::clone(&self.inner);
thread::Builder::new()
.name("glean.upload".into())
.spawn(move || {
inner.thread_running.store(true, Ordering::SeqCst);
loop {
let incoming_task = with_glean(|glean| glean.get_upload_task());
match incoming_task {
PingUploadTask::Upload(request) => {
let doc_id = request.document_id.clone();
let upload_url = format!("{}{}", inner.server_endpoint, request.path);
let headers: Vec<(String, String)> =
request.headers.into_iter().collect();
let result = inner.uploader.upload(upload_url, request.body, headers);
with_glean(|glean| glean.process_ping_upload_response(&doc_id, result));
}
PingUploadTask::Wait(time) => {
thread::sleep(Duration::from_millis(time));
}
PingUploadTask::Done => {
inner.thread_running.store(false, Ordering::SeqCst);
return;
}
}
}
})
.expect("Failed to spawn Glean's uploader thread");
}
}