1#![cfg(not(feature="no-stdlib"))]
2use core;
3
4use std::sync::{Arc, Mutex, Condvar};
5use threading::{SerialWorker, MainToThread, ThreadToMain, CommandResult, ThreadData, NUM_SERIAL_COMMANDS_BUFFERED, NUM_DATA_BUFFERED,};
6use slice_util::{AllocatedMemoryRange, AllocatedMemoryPrefix};
7use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
8use alloc_util::RepurposingAlloc;
9use cmd_to_raw::DivansRecodeState;
10use interface::{PredictionModeContextMap, EncoderOrDecoderRecoderSpecialization, Command, DivansOpResult, DivansOutputResult, ErrMsg};
11use std::time::{SystemTime, Duration};
12use threading::{StaticCommand, PullAllocatedCommand, downcast_command};
13#[cfg(feature="threadlog")]
14const MAX_LOG_SIZE: usize = 8192;
15#[cfg(not(feature="threadlog"))]
16const MAX_LOG_SIZE: usize = 0;
17
18
19pub struct MultiWorker<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> {
20 start: SystemTime,
21 queue: Arc<(Mutex<SerialWorker<AllocU8, AllocCommand>>, Condvar)>,
22 log: [ThreadEvent; MAX_LOG_SIZE],
23 log_offset: u32,
24}
25#[allow(dead_code)]
26#[allow(non_camel_case_types)]
27#[derive(Debug, Clone, Copy)]
28enum ThreadEventType {
29 M_PUSH_CONTEXT_MAP,
30 M_WAIT_PUSH_CONTEXT_MAP,
31 M_PUSH_DATA,
32 M_PUSH_EMPTY_DATA,
33 M_FAIL_PUSH_DATA,
34 M_PULL_COMMAND_RESULT,
35 M_BROADCAST_ERR,
36 M_WAIT_PULL_COMMAND_RESULT,
37 W_PULL_DATA,
38 W_WAIT_PULL_DATA,
39 W_PULL_CONTEXT_MAP,
40 W_WAIT_PULL_CONTEXT_MAP,
41 W_PUSH_CMD,
42 W_WAIT_PUSH_CMD,
43 W_PUSH_BATCH_CMD,
44 W_WAIT_PUSH_BATCH_CMD,
45 W_PUSH_CONSUMED_DATA,
46 W_WAIT_PUSH_CONSUMED_DATA,
47 W_PUSH_EOF,
48 W_WAIT_PUSH_EOF,
49 W_BROADCAST_ERR,
50}
51#[derive(Debug, Clone, Copy)]
52struct ThreadEvent(ThreadEventType, u32, Duration);
53#[cfg(feature="threadlog")]
54macro_rules! unguarded_debug_time {
55 ($proc: expr) => {
56 $proc.start.elapsed().unwrap_or(Duration::new(0,0))
57 }
58
59}
60#[cfg(not(feature="threadlog"))]
61macro_rules! unguarded_debug_time {
62 ($proc: expr) => {
63 ()
64 }
65}
66#[cfg(feature="threadlog")]
67macro_rules! thread_debug {
68 ($en: expr, $quant: expr, $proc: expr, $timevar: expr) => {
69 if $proc.log_offset as usize != $proc.log.len() {
70 $proc.log[$proc.log_offset as usize] = ThreadEvent($en, $quant as u32, $timevar);
71 $proc.log_offset += 1;
72 }
73 eprintln!("{:?} {} {:?}", $en, $quant, $proc.start.elapsed().unwrap_or(Duration::new(0,0)));
74 };
75}
76
77#[cfg(not(feature="threadlog"))]
78macro_rules! thread_debug {
79 ($a: expr, $b: expr, $c: expr, $d: expr) => {
80 };
81}
82
83impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> Clone for MultiWorker<AllocU8, AllocCommand> {
84 fn clone(&self) -> Self {
85 Self {
86 log:self.log.clone(),
87 log_offset:self.log_offset.clone(),
88 start:self.start,
89 queue:self.queue.clone(),
90 }
91 }
92}
93
94#[cfg(feature="threadlog")]
95impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> Drop for MultiWorker<AllocU8, AllocCommand> {
96 fn drop(&mut self) {
97 let epoch_d = self.start.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::new(0,0));
98 let epoch = (epoch_d.as_secs()%100) * 100 + u64::from(epoch_d.subsec_nanos()) / 10000000;
99 let start_log = self.start.elapsed().unwrap_or(Duration::new(0,0));
100 use std;
101 use std::io::Write;
102 let stderr = std::io::stderr();
103 let mut handle = stderr.lock();
104 writeln!(handle, "{:04}:{:02}:{:09}:LOG_START", epoch, start_log.as_secs(), start_log.subsec_nanos()).unwrap();
105 for entry in self.log[..self.log_offset as usize].iter() {
106 writeln!(handle, "{:04}:{:02}:{:09}:{:?}:{}", epoch, entry.2.as_secs(), entry.2.subsec_nanos(), entry.0, entry.1).unwrap();
107 }
108 let fin_log = self.start.elapsed().unwrap_or(Duration::new(0,0));
109 writeln!(handle, "{:04}:{:02}:{:09}:LOG_FLUSH:{:?}ns", epoch, fin_log.as_secs(), fin_log.subsec_nanos(), fin_log - start_log).unwrap();
110 }
111}
112
113
114
115impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> MultiWorker<AllocU8, AllocCommand>
116{
117 pub fn new(mcommand: &mut AllocCommand) -> Self {
118 MultiWorker::<AllocU8, AllocCommand> {
119 log:[ThreadEvent(ThreadEventType::M_PUSH_EMPTY_DATA, 0, Duration::new(0,0)); MAX_LOG_SIZE],
120 log_offset:0,
121 start: SystemTime::now(),
122 queue: Arc::new((Mutex::new(SerialWorker::<AllocU8, AllocCommand>::new(mcommand)), Condvar::new())),
123 }
124 }
125 fn broadcast_err_internal(&mut self, err: ErrMsg, _thread_event_type: ThreadEventType) {
126 let _elapsed = unguarded_debug_time!(self);
127 let &(ref lock, ref cvar) = &*self.queue;
128 let mut worker = lock.lock().unwrap();
129 if worker.waiters != 0 {
130 cvar.notify_one();
131 }
132 let ret = worker.broadcast_err_internal(err);
133 thread_debug!(_thread_event_type, output.len(), self, _elapsed);
134 return ret;
135 }
136 pub fn free(&mut self, m8: &mut RepurposingAlloc<u8, AllocU8>, mcommand: &mut AllocCommand) {
137 let &(ref lock, ref cvar) = &*self.queue;
138 let mut worker = lock.lock().unwrap();
139 if worker.waiters != 0 {
140 worker.broadcast_err_internal(ErrMsg::UnexpectedEof);
141 cvar.notify_one();
142 }
143 worker.free(m8, mcommand);
144 }
145}
146impl<AllocU8:Allocator<u8>, AllocCommand: Allocator<StaticCommand>> PullAllocatedCommand<AllocU8, AllocCommand> for MultiWorker<AllocU8, AllocCommand> {
147 fn pull_command_buf(&mut self,
148 output:&mut AllocatedMemoryPrefix<StaticCommand, AllocCommand>,
149 consumed_data:&mut [AllocatedMemoryRange<u8, AllocU8>;NUM_DATA_BUFFERED],
150 pm:&mut [PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>; 2]) -> CommandResult {
151 self.pull(output, consumed_data, pm)
152 }
153}
154
155impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> MainToThread<AllocU8> for MultiWorker<AllocU8, AllocCommand> {
156 const COOPERATIVE_MAIN:bool = false;
157 type CommandOutputType= <SerialWorker<AllocU8, AllocCommand> as MainToThread<AllocU8>>::CommandOutputType;
158 #[inline(always)]
159 fn push_context_map(&mut self, cm: PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>) -> Result<(),()> {
160
161 loop { let _elapsed = unguarded_debug_time!(self);
163 let &(ref lock, ref cvar) = &*self.queue;
164 let mut worker = lock.lock().unwrap();
165 if worker.cm_space_ready() {
166 thread_debug!(ThreadEventType::M_PUSH_CONTEXT_MAP, 1, self, _elapsed);
167 if worker.waiters != 0 {
168 cvar.notify_one();
169 }
170 return worker.push_context_map(cm);
171 } else {
172 thread_debug!(ThreadEventType::M_WAIT_PUSH_CONTEXT_MAP, 0, self, _elapsed);
173 worker.waiters += 1;
174 let _ign = cvar.wait(worker); _ign.unwrap().waiters -= 1;
176 }
177 }
178 }
179 #[inline(always)]
180 fn push(&mut self, data: &mut AllocatedMemoryRange<u8, AllocU8>) -> Result<(),()> {
181 let _elapsed = unguarded_debug_time!(self);
182 let _len = data.len();
183 let &(ref lock, ref cvar) = &*self.queue;
184 let mut worker = lock.lock().unwrap();
185 match worker.push(data) {
186 Ok(()) => {
187 thread_debug!(ThreadEventType::M_PUSH_DATA, _len, self, _elapsed);
188 if worker.waiters != 0 {
189 cvar.notify_one();
190 }
191 return Ok(());
192 },
193 err => {
194 if data.len() == 0 {
195 thread_debug!(ThreadEventType::M_PUSH_EMPTY_DATA, 0, self, _elapsed);
196 } else {
197 thread_debug!(ThreadEventType::M_FAIL_PUSH_DATA, 0, self, _elapsed);
198 }
199 return err
200 },
201 }
202 }
203
204 #[inline(always)]
205 fn pull(&mut self,
206 output:&mut Self::CommandOutputType,
207 consumed_data:&mut [AllocatedMemoryRange<u8, AllocU8>;NUM_DATA_BUFFERED],
208 pm:&mut [PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>; 2]) -> CommandResult {
209 loop {
210 let _elapsed = unguarded_debug_time!(self);
211 let &(ref lock, ref cvar) = &*self.queue;
212 let mut worker = lock.lock().unwrap();
213 if worker.result_ready() {
214 if worker.waiters != 0 {
215 cvar.notify_one(); }
217 let ret = worker.pull(output, consumed_data, pm);
218 thread_debug!(ThreadEventType::M_PULL_COMMAND_RESULT, output.len(), self, _elapsed);
219 return ret;
220 } else if worker.err.is_none() {
221 thread_debug!(ThreadEventType::M_WAIT_PULL_COMMAND_RESULT, 0, self, _elapsed);
222 worker.waiters += 1;
223 let _ign = cvar.wait(worker);
224 _ign.unwrap().waiters -= 1;
225 } else {
227 return CommandResult::Err(worker.err.unwrap());
228 }
229 }
230 }
231 fn broadcast_err(&mut self,
232 err:ErrMsg) {
233 self.broadcast_err_internal(err, ThreadEventType::M_BROADCAST_ERR);
234 }
235}
236
237impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> ThreadToMain<AllocU8> for MultiWorker<AllocU8, AllocCommand> {
238 const COOPERATIVE:bool = false;
239 const ISOLATED:bool = true;
240 #[inline(always)]
241 fn pull_data(&mut self) -> ThreadData<AllocU8> {
242 loop {
243 let _elapsed = unguarded_debug_time!(self);
244 let &(ref lock, ref cvar) = &*self.queue;
245 let mut worker = lock.lock().unwrap();
246 if worker.data_ready() {
247 let ret = worker.pull_data();
248 thread_debug!(ThreadEventType::W_PULL_DATA, match ret {ThreadData::Data(ref d) => d.len(), ThreadData::Yield => 0, ThreadData::Eof=> 99999999,}, self, _elapsed);
249 return ret;
250 } else {
251 thread_debug!(ThreadEventType::W_WAIT_PULL_DATA, 0, self, _elapsed);
252 worker.waiters += 1;
253 let _ign = cvar.wait(worker);
254 _ign.unwrap().waiters -= 1;
255 }
256 }
257 }
258 #[inline(always)]
259 fn pull_context_map(&mut self,
260 m8: Option<&mut RepurposingAlloc<u8, AllocU8>>) -> Result<PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>, ()> {
261 loop {
262 let _elapsed = unguarded_debug_time!(self);
263 let &(ref lock, ref cvar) = &*self.queue;
264 let mut worker = lock.lock().unwrap();
265 if worker.cm_ready() {
266 if worker.waiters != 0 {
267 cvar.notify_one();
268 }
269 thread_debug!(ThreadEventType::W_PULL_CONTEXT_MAP, 1, self, _elapsed);
270 return worker.pull_context_map(m8);
271 } else {
272 thread_debug!(ThreadEventType::W_WAIT_PULL_CONTEXT_MAP, 0, self, _elapsed);
273 worker.waiters += 1;
274 let _ign = cvar.wait(worker);
275 _ign.unwrap().waiters -= 1;
276 }
277 }
278 }
279 #[inline(always)]
280 fn push_cmd<Specialization:EncoderOrDecoderRecoderSpecialization>(
281 &mut self,
282 cmd:&mut Command<AllocatedMemoryPrefix<u8, AllocU8>>,
283 m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
284 recoder: Option<&mut DivansRecodeState<AllocU8::AllocatedMemory>>,
285 specialization: &mut Specialization,
286 output:&mut [u8],
287 output_offset: &mut usize,
288 ) -> DivansOutputResult {
289 loop {
290 let _elapsed = unguarded_debug_time!(self);
291 let &(ref lock, ref cvar) = &*self.queue;
292 let mut worker = lock.lock().unwrap();
293 if worker.result_space_ready() {
294 thread_debug!(ThreadEventType::W_PUSH_CMD, 1, self, _elapsed);
295 if worker.waiters != 0 {
296 cvar.notify_one();
297 }
298 return worker.push_cmd(cmd, m8, recoder, specialization, output, output_offset);
299 } else {
300 thread_debug!(ThreadEventType::W_WAIT_PUSH_CMD, 0, self, _elapsed);
301 worker.waiters += 1;
302 let _ign = cvar.wait(worker);
303 _ign.unwrap().waiters -= 1;
304 }
305 }
306 }
307 #[inline(always)]
308 fn push_consumed_data(&mut self,
309 data:&mut AllocatedMemoryRange<u8, AllocU8>,
310 m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
311 ) -> DivansOutputResult {
312 let _len = data.len();
313 loop {
314 let _elapsed = unguarded_debug_time!(self);
315 let &(ref lock, ref cvar) = &*self.queue;
316 let mut worker = lock.lock().unwrap();
317 if worker.result_space_ready() {
318 if worker.waiters != 0 {
319 cvar.notify_one();
320 }
321 thread_debug!(ThreadEventType::W_PUSH_CONSUMED_DATA, _len, self, _elapsed);
322 return worker.push_consumed_data(data, m8);
323 } else {
324 thread_debug!(ThreadEventType::W_WAIT_PUSH_CONSUMED_DATA, 0, self, _elapsed);
325 worker.waiters += 1;
326 let _ign = cvar.wait(worker);
327 _ign.unwrap().waiters -= 1;
328 }
329 }
330 }
331 #[inline(always)]
332 fn push_eof(&mut self,
333 ) -> DivansOutputResult {
334 loop {
335 let _elapsed = unguarded_debug_time!(self);
336 let &(ref lock, ref cvar) = &*self.queue;
337 let mut worker = lock.lock().unwrap();
338 if worker.result_space_ready() {
339 if worker.waiters != 0 {
340 cvar.notify_one();
341 }
342 thread_debug!(ThreadEventType::W_PUSH_EOF, 1, self, _elapsed);
343 return worker.push_eof();
344 } else {
345 thread_debug!(ThreadEventType::W_WAIT_PUSH_EOF, 1, self, _elapsed);
346 worker.waiters += 1;
347 let _ign = cvar.wait(worker);
348 _ign.unwrap().waiters -=1;
349 }
350 }
351 }
352 fn broadcast_err(&mut self, err: ErrMsg
353 ) {
354 self.broadcast_err_internal(err, ThreadEventType::W_BROADCAST_ERR);
355 }
356}
357
358pub struct BufferedMultiWorker<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> {
359 pub worker: MultiWorker<AllocU8, AllocCommand>,
360 buffer: AllocatedMemoryPrefix<StaticCommand, AllocCommand>,
361 min_buffer_push_len: usize,
362}
363impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> BufferedMultiWorker<AllocU8, AllocCommand> {
374 pub fn new(mc: &mut AllocCommand)->Self{
375 let worker = MultiWorker::<AllocU8, AllocCommand>::new(mc);
376 Self {
377 min_buffer_push_len: 2,
378 worker:worker,
379 buffer: AllocatedMemoryPrefix::realloc(mc.alloc_cell(NUM_SERIAL_COMMANDS_BUFFERED), 0),
380 }
381 }
382 fn force_push(&mut self, eof_inside: bool, data: &mut AllocatedMemoryRange<u8, AllocU8>, pm: Option<&mut PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>>) -> DivansOpResult {
383 if self.min_buffer_push_len * 2 < self.buffer.max_len(){
384 self.min_buffer_push_len <<= 2;
385 }
386 loop {
387 let _elapsed = unguarded_debug_time!(self.worker);
388 let &(ref lock, ref cvar) = &*self.worker.queue;
389 let mut worker = lock.lock().unwrap();
390 let mut did_notify = false;
391 if data.0.len() != 0 { match worker.push_consumed_data(data, None) {
393 DivansOutputResult::Success => {
394 thread_debug!(ThreadEventType::W_PUSH_CONSUMED_DATA, data.0.len() as u32, self.worker, _elapsed);
395 },
396 DivansOutputResult::NeedsMoreOutput => {
397 thread_debug!(ThreadEventType::W_WAIT_PUSH_CONSUMED_DATA, data.0.len(), self.worker, _elapsed);
398 worker.waiters += 1;
399 let _ign = cvar.wait(worker);
400 _ign.unwrap().waiters -= 1;
401 continue;
402 }
403 DivansOutputResult::Failure(e) => {
404 worker.set_error(e);
405 }
406 }
407 if worker.waiters != 0 && !did_notify{
408 cvar.notify_one();
409 did_notify = true;
410 }
411 }
412 if worker.result_multi_space_ready(self.buffer.1 as usize) {
413 thread_debug!(ThreadEventType::W_PUSH_CMD, self.buffer.1, self.worker, _elapsed);
414 if eof_inside {
415 worker.set_eof_hint(); }
417 if worker.waiters != 0 && !did_notify{
418 cvar.notify_one();
419 }
420 let extant_space = worker.insert_results(&mut self.buffer, pm);
421 if extant_space <= 16 {
422 self.min_buffer_push_len = core::cmp::max(self.min_buffer_push_len >> 1, 4);
423 }
424 self.buffer.1 = 0;
425 return DivansOpResult::Success;
426 } else if worker.err.is_none() {
427 thread_debug!(ThreadEventType::W_WAIT_PUSH_CMD, self.buffer.1, self.worker, _elapsed);
428 worker.waiters += 1;
429 let _ign = cvar.wait(worker);
430 _ign.unwrap().waiters -= 1;
431 } else {
432 return DivansOpResult::Failure(worker.err.unwrap());
433 }
434 }
435 }
436 pub fn free(&mut self, m8: &mut RepurposingAlloc<u8, AllocU8>, mc: &mut AllocCommand) {
437 mc.free_cell(core::mem::replace(&mut self.buffer.0,
438 AllocCommand::AllocatedMemory::default()));
439 self.worker.free(m8, mc);
440 }
441}
442impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> ThreadToMain<AllocU8> for BufferedMultiWorker<AllocU8, AllocCommand> {
443 const COOPERATIVE:bool = false;
444 const ISOLATED:bool = true;
445 #[inline(always)]
446 fn pull_data(&mut self) -> ThreadData<AllocU8> {
447 self.worker.pull_data()
448 }
449 #[inline(always)]
450 fn pull_context_map(&mut self,
451 m8: Option<&mut RepurposingAlloc<u8, AllocU8>>) -> Result<PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>, ()> {
452 self.worker.pull_context_map(m8)
453 }
454 #[inline(always)]
455 fn push_cmd<Specialization:EncoderOrDecoderRecoderSpecialization>(
456 &mut self,
457 cmd:&mut Command<AllocatedMemoryPrefix<u8, AllocU8>>,
458 _m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
459 _recoder: Option<&mut DivansRecodeState<AllocU8::AllocatedMemory>>,
460 _specialization: &mut Specialization,
461 _output:&mut [u8],
462 _output_offset: &mut usize,
463 ) -> DivansOutputResult {
464 let (static_command, pm) = downcast_command(cmd);
465 self.buffer.0.slice_mut()[self.buffer.1 as usize] = static_command;
466 self.buffer.1 += 1;
467 if pm.is_some() {
468 DivansOutputResult::from(self.force_push(false, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), pm))
469 } else if self.buffer.1 as usize == self.buffer.0.len() || self.buffer.1 as usize == self.min_buffer_push_len {
470 DivansOutputResult::from(self.force_push(false, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), None))
471 } else {
472 DivansOutputResult::Success
474 }
475 }
476 #[inline(always)]
477 fn push_consumed_data(&mut self,
478 data:&mut AllocatedMemoryRange<u8, AllocU8>,
479 _m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
480 ) -> DivansOutputResult {
481 DivansOutputResult::from(self.force_push(false, data, None))
482 }
483 #[inline(always)]
484 fn push_eof(&mut self,
485 ) -> DivansOutputResult {
486 DivansOutputResult::from(self.force_push(true, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), None))
487 }
488 #[inline(always)]
489 fn broadcast_err(&mut self, err: ErrMsg) {
490 self.worker.broadcast_err_internal(err, ThreadEventType::W_BROADCAST_ERR)
491 }
492}