divans/
multithreading.rs

1#![cfg(not(feature="no-stdlib"))]
2use core;
3
4use std::sync::{Arc, Mutex, Condvar};
5use threading::{SerialWorker, MainToThread, ThreadToMain, CommandResult, ThreadData, NUM_SERIAL_COMMANDS_BUFFERED, NUM_DATA_BUFFERED,};
6use slice_util::{AllocatedMemoryRange, AllocatedMemoryPrefix};
7use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
8use alloc_util::RepurposingAlloc;
9use cmd_to_raw::DivansRecodeState;
10use interface::{PredictionModeContextMap, EncoderOrDecoderRecoderSpecialization, Command, DivansOpResult, DivansOutputResult, ErrMsg};
11use std::time::{SystemTime, Duration};
12use threading::{StaticCommand, PullAllocatedCommand, downcast_command};
13#[cfg(feature="threadlog")]
14const MAX_LOG_SIZE: usize = 8192;
15#[cfg(not(feature="threadlog"))]
16const MAX_LOG_SIZE: usize = 0;
17
18
19pub struct MultiWorker<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> {
20    start: SystemTime,
21    queue: Arc<(Mutex<SerialWorker<AllocU8, AllocCommand>>, Condvar)>,
22    log: [ThreadEvent; MAX_LOG_SIZE],
23    log_offset: u32,
24}
25#[allow(dead_code)]
26#[allow(non_camel_case_types)]
27#[derive(Debug, Clone, Copy)]
28enum ThreadEventType {
29    M_PUSH_CONTEXT_MAP,
30    M_WAIT_PUSH_CONTEXT_MAP,
31    M_PUSH_DATA,
32    M_PUSH_EMPTY_DATA,
33    M_FAIL_PUSH_DATA,
34    M_PULL_COMMAND_RESULT,
35    M_BROADCAST_ERR,
36    M_WAIT_PULL_COMMAND_RESULT,
37    W_PULL_DATA,
38    W_WAIT_PULL_DATA,
39    W_PULL_CONTEXT_MAP,
40    W_WAIT_PULL_CONTEXT_MAP,
41    W_PUSH_CMD,
42    W_WAIT_PUSH_CMD,
43    W_PUSH_BATCH_CMD,
44    W_WAIT_PUSH_BATCH_CMD,
45    W_PUSH_CONSUMED_DATA,
46    W_WAIT_PUSH_CONSUMED_DATA,
47    W_PUSH_EOF,
48    W_WAIT_PUSH_EOF,
49    W_BROADCAST_ERR,
50}
51#[derive(Debug, Clone, Copy)]
52struct ThreadEvent(ThreadEventType, u32, Duration);
53#[cfg(feature="threadlog")]
54macro_rules! unguarded_debug_time {
55    ($proc: expr) => {
56        $proc.start.elapsed().unwrap_or(Duration::new(0,0))
57    }
58        
59}
60#[cfg(not(feature="threadlog"))]
61macro_rules! unguarded_debug_time {
62    ($proc: expr) => {
63        ()
64    }
65}
66#[cfg(feature="threadlog")]
67macro_rules! thread_debug {
68    ($en: expr, $quant: expr, $proc: expr, $timevar: expr) => {
69        if $proc.log_offset as usize != $proc.log.len() {
70            $proc.log[$proc.log_offset as usize] = ThreadEvent($en, $quant as u32, $timevar);
71            $proc.log_offset += 1;
72        }
73        eprintln!("{:?} {} {:?}", $en, $quant, $proc.start.elapsed().unwrap_or(Duration::new(0,0)));
74    };
75}
76
77#[cfg(not(feature="threadlog"))]
78macro_rules! thread_debug {
79    ($a: expr, $b: expr, $c: expr, $d: expr) => {
80    };
81}
82
83impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> Clone for MultiWorker<AllocU8, AllocCommand> {
84    fn clone(&self) -> Self {
85        Self {
86            log:self.log.clone(),
87            log_offset:self.log_offset.clone(),
88            start:self.start,
89            queue:self.queue.clone(),
90        }
91    }
92}
93
94#[cfg(feature="threadlog")]
95impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> Drop for MultiWorker<AllocU8, AllocCommand> {
96    fn drop(&mut self) {
97        let epoch_d = self.start.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::new(0,0));
98        let epoch = (epoch_d.as_secs()%100) * 100 + u64::from(epoch_d.subsec_nanos()) / 10000000;
99        let start_log = self.start.elapsed().unwrap_or(Duration::new(0,0));
100        use std;
101        use std::io::Write;
102        let stderr = std::io::stderr();
103        let mut handle = stderr.lock();
104        writeln!(handle, "{:04}:{:02}:{:09}:LOG_START", epoch, start_log.as_secs(), start_log.subsec_nanos()).unwrap();
105        for entry in self.log[..self.log_offset as usize].iter() {
106            writeln!(handle, "{:04}:{:02}:{:09}:{:?}:{}", epoch, entry.2.as_secs(), entry.2.subsec_nanos(), entry.0, entry.1).unwrap();
107        }
108        let fin_log = self.start.elapsed().unwrap_or(Duration::new(0,0));
109        writeln!(handle, "{:04}:{:02}:{:09}:LOG_FLUSH:{:?}ns", epoch, fin_log.as_secs(), fin_log.subsec_nanos(), fin_log - start_log).unwrap();
110    }
111}
112
113
114
115impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> MultiWorker<AllocU8, AllocCommand>
116{
117    pub fn new(mcommand: &mut AllocCommand) -> Self {
118        MultiWorker::<AllocU8, AllocCommand> {
119            log:[ThreadEvent(ThreadEventType::M_PUSH_EMPTY_DATA, 0, Duration::new(0,0)); MAX_LOG_SIZE],
120            log_offset:0,
121            start: SystemTime::now(),
122            queue: Arc::new((Mutex::new(SerialWorker::<AllocU8, AllocCommand>::new(mcommand)), Condvar::new())),
123        }
124    }
125    fn broadcast_err_internal(&mut self, err: ErrMsg, _thread_event_type: ThreadEventType) {
126        let _elapsed = unguarded_debug_time!(self);
127        let &(ref lock, ref cvar) = &*self.queue;
128        let mut worker = lock.lock().unwrap();
129        if worker.waiters != 0 {
130            cvar.notify_one();
131        }
132        let ret = worker.broadcast_err_internal(err);
133        thread_debug!(_thread_event_type, output.len(), self, _elapsed);
134        return ret;        
135    }
136    pub fn free(&mut self, m8: &mut RepurposingAlloc<u8, AllocU8>, mcommand: &mut AllocCommand) {
137        let &(ref lock, ref cvar) = &*self.queue;
138        let mut worker = lock.lock().unwrap();
139        if worker.waiters != 0 {
140            worker.broadcast_err_internal(ErrMsg::UnexpectedEof);
141            cvar.notify_one();
142        }
143        worker.free(m8, mcommand);
144    }
145}
146impl<AllocU8:Allocator<u8>, AllocCommand: Allocator<StaticCommand>> PullAllocatedCommand<AllocU8, AllocCommand> for MultiWorker<AllocU8, AllocCommand> {
147    fn pull_command_buf(&mut self,
148                        output:&mut AllocatedMemoryPrefix<StaticCommand, AllocCommand>,
149                        consumed_data:&mut [AllocatedMemoryRange<u8, AllocU8>;NUM_DATA_BUFFERED],
150                        pm:&mut [PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>; 2]) -> CommandResult {
151        self.pull(output, consumed_data, pm)
152    }
153}
154
155impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> MainToThread<AllocU8> for MultiWorker<AllocU8, AllocCommand> {
156    const COOPERATIVE_MAIN:bool = false;
157    type CommandOutputType= <SerialWorker<AllocU8, AllocCommand> as MainToThread<AllocU8>>::CommandOutputType;
158    #[inline(always)]
159    fn push_context_map(&mut self, cm: PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>) -> Result<(),()> {
160        
161        loop { // FIXME: should this loop forever? We should never run out of context map room
162            let _elapsed = unguarded_debug_time!(self);
163            let &(ref lock, ref cvar) = &*self.queue;
164            let mut worker = lock.lock().unwrap();
165            if worker.cm_space_ready() {
166                thread_debug!(ThreadEventType::M_PUSH_CONTEXT_MAP, 1, self, _elapsed);
167                if worker.waiters != 0 {
168                    cvar.notify_one();
169                }
170                return worker.push_context_map(cm);
171            } else {
172                thread_debug!(ThreadEventType::M_WAIT_PUSH_CONTEXT_MAP, 0, self, _elapsed);
173                worker.waiters += 1;
174                let _ign = cvar.wait(worker); // always safe to loop around again
175                _ign.unwrap().waiters -= 1;
176            }
177        }
178    }
179    #[inline(always)]
180    fn push(&mut self, data: &mut AllocatedMemoryRange<u8, AllocU8>) -> Result<(),()> {
181        let _elapsed = unguarded_debug_time!(self);
182        let _len = data.len();
183        let &(ref lock, ref cvar) = &*self.queue;
184        let mut worker = lock.lock().unwrap();
185        match worker.push(data) {
186            Ok(()) => {
187                thread_debug!(ThreadEventType::M_PUSH_DATA, _len, self, _elapsed);
188                if worker.waiters != 0 {
189                    cvar.notify_one();
190                }
191                return Ok(());
192            },
193            err => {
194                if data.len() == 0 {
195                    thread_debug!(ThreadEventType::M_PUSH_EMPTY_DATA, 0, self, _elapsed);
196                } else {
197                    thread_debug!(ThreadEventType::M_FAIL_PUSH_DATA, 0, self, _elapsed);
198                }
199                return err
200            },
201        }
202    }
203
204    #[inline(always)]
205    fn pull(&mut self,
206            output:&mut Self::CommandOutputType,
207            consumed_data:&mut [AllocatedMemoryRange<u8, AllocU8>;NUM_DATA_BUFFERED],
208            pm:&mut [PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>; 2]) -> CommandResult {
209        loop {
210            let _elapsed = unguarded_debug_time!(self);
211            let &(ref lock, ref cvar) = &*self.queue;
212            let mut worker = lock.lock().unwrap();
213            if worker.result_ready() {
214                if worker.waiters != 0 {
215                    cvar.notify_one(); // FIXME: do we want to signal here?
216                }
217                let ret = worker.pull(output, consumed_data, pm);
218                thread_debug!(ThreadEventType::M_PULL_COMMAND_RESULT, output.len(), self, _elapsed);
219                return ret;
220            } else if worker.err.is_none() {
221                thread_debug!(ThreadEventType::M_WAIT_PULL_COMMAND_RESULT, 0, self, _elapsed);
222                worker.waiters += 1;
223                let _ign = cvar.wait(worker);
224                _ign.unwrap().waiters -= 1;
225                //return CommandResult::ProcessedData(AllocatedMemoryRange::<u8, AllocU8>::default()); // FIXME: busy wait
226            } else {
227                return CommandResult::Err(worker.err.unwrap());
228            }
229        }
230    }
231    fn broadcast_err(&mut self,
232                     err:ErrMsg) {
233        self.broadcast_err_internal(err, ThreadEventType::M_BROADCAST_ERR);
234    }
235}
236
237impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> ThreadToMain<AllocU8> for MultiWorker<AllocU8, AllocCommand> {
238    const COOPERATIVE:bool = false;
239    const ISOLATED:bool = true;
240    #[inline(always)]
241    fn pull_data(&mut self) -> ThreadData<AllocU8> {
242        loop {
243            let _elapsed = unguarded_debug_time!(self);
244            let &(ref lock, ref cvar) = &*self.queue;
245            let mut worker = lock.lock().unwrap();
246            if worker.data_ready() {
247                let ret = worker.pull_data();
248                thread_debug!(ThreadEventType::W_PULL_DATA, match ret {ThreadData::Data(ref d) => d.len(), ThreadData::Yield => 0, ThreadData::Eof=> 99999999,}, self, _elapsed);
249                return ret;
250            } else {
251                thread_debug!(ThreadEventType::W_WAIT_PULL_DATA, 0, self, _elapsed);
252                worker.waiters += 1;
253                let _ign = cvar.wait(worker);
254                _ign.unwrap().waiters -= 1;
255            }
256        }
257    }
258    #[inline(always)]
259    fn pull_context_map(&mut self,
260                        m8: Option<&mut RepurposingAlloc<u8, AllocU8>>) -> Result<PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>, ()> {
261        loop {
262            let _elapsed = unguarded_debug_time!(self);
263            let &(ref lock, ref cvar) = &*self.queue;
264            let mut worker = lock.lock().unwrap();
265            if worker.cm_ready() {
266                if worker.waiters != 0 {
267                    cvar.notify_one();
268                }
269                thread_debug!(ThreadEventType::W_PULL_CONTEXT_MAP, 1, self, _elapsed);
270                return worker.pull_context_map(m8);
271            } else {
272                thread_debug!(ThreadEventType::W_WAIT_PULL_CONTEXT_MAP, 0, self, _elapsed);
273                worker.waiters += 1;
274                let _ign = cvar.wait(worker);
275                _ign.unwrap().waiters -= 1;
276            }
277        }
278    }
279    #[inline(always)]
280    fn push_cmd<Specialization:EncoderOrDecoderRecoderSpecialization>(
281        &mut self,
282        cmd:&mut Command<AllocatedMemoryPrefix<u8, AllocU8>>,
283        m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
284        recoder: Option<&mut DivansRecodeState<AllocU8::AllocatedMemory>>,
285        specialization: &mut Specialization,
286        output:&mut [u8],
287        output_offset: &mut usize,
288    ) -> DivansOutputResult {
289        loop {
290            let _elapsed = unguarded_debug_time!(self);
291            let &(ref lock, ref cvar) = &*self.queue;
292            let mut worker = lock.lock().unwrap();
293            if worker.result_space_ready() {
294                thread_debug!(ThreadEventType::W_PUSH_CMD, 1, self, _elapsed);
295                if worker.waiters != 0 {
296                    cvar.notify_one();
297                }
298                return worker.push_cmd(cmd, m8, recoder, specialization, output, output_offset);
299            } else {
300                thread_debug!(ThreadEventType::W_WAIT_PUSH_CMD, 0, self, _elapsed);
301                worker.waiters += 1;
302                let _ign = cvar.wait(worker);
303                _ign.unwrap().waiters -= 1;
304            }
305        }
306    }
307    #[inline(always)]
308    fn push_consumed_data(&mut self,
309                    data:&mut AllocatedMemoryRange<u8, AllocU8>,
310                    m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
311    ) -> DivansOutputResult {
312        let _len = data.len();
313        loop {
314            let _elapsed = unguarded_debug_time!(self);
315            let &(ref lock, ref cvar) = &*self.queue;
316            let mut worker = lock.lock().unwrap();
317            if worker.result_space_ready() {
318                if worker.waiters != 0 {
319                    cvar.notify_one();
320                }
321                thread_debug!(ThreadEventType::W_PUSH_CONSUMED_DATA, _len, self, _elapsed);
322                return worker.push_consumed_data(data, m8);
323            } else {
324                thread_debug!(ThreadEventType::W_WAIT_PUSH_CONSUMED_DATA, 0, self, _elapsed);
325                worker.waiters += 1;
326                let _ign = cvar.wait(worker);
327                _ign.unwrap().waiters -= 1;
328            }
329        }
330    }
331   #[inline(always)]
332    fn push_eof(&mut self,
333    ) -> DivansOutputResult {
334        loop {
335            let _elapsed = unguarded_debug_time!(self);
336            let &(ref lock, ref cvar) = &*self.queue;
337            let mut worker = lock.lock().unwrap();
338            if worker.result_space_ready() {
339                if worker.waiters != 0 {
340                    cvar.notify_one();
341                }
342                thread_debug!(ThreadEventType::W_PUSH_EOF, 1, self, _elapsed);
343                return worker.push_eof();
344            } else {
345                thread_debug!(ThreadEventType::W_WAIT_PUSH_EOF, 1, self, _elapsed);
346                worker.waiters += 1;
347                let _ign = cvar.wait(worker);
348                _ign.unwrap().waiters -=1;
349            }
350        }
351    }
352    fn broadcast_err(&mut self, err: ErrMsg
353    ) {
354        self.broadcast_err_internal(err, ThreadEventType::W_BROADCAST_ERR);
355    }
356}
357
358pub struct BufferedMultiWorker<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> {
359    pub worker: MultiWorker<AllocU8, AllocCommand>,
360    buffer: AllocatedMemoryPrefix<StaticCommand, AllocCommand>,
361    min_buffer_push_len: usize,
362}
363/*
364impl<AllocU8:Allocator<u8>, AllocCommand: Allocator<StaticCommand>> PullAllocatedCommand<AllocU8, AllocCommand> for BufferedMultiWorker<AllocU8, AllocCommand> {
365    fn pull_command_buf(&mut self) -> (&mut AllocatedMemoryPrefix<StaticCommand, AllocCommand>,
366                                       &mut [AllocatedMemoryRange<u8, AllocU8>;NUM_DATA_BUFFERED],
367                                       &mut [PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>; NUM_DATA_BUFFERED], CommandResult) {
368        self.pull()
369    }
370}*/
371
372
373impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> BufferedMultiWorker<AllocU8, AllocCommand> {
374    pub fn new(mc: &mut AllocCommand)->Self{
375        let worker = MultiWorker::<AllocU8, AllocCommand>::new(mc);
376        Self {
377            min_buffer_push_len: 2,
378            worker:worker,
379            buffer: AllocatedMemoryPrefix::realloc(mc.alloc_cell(NUM_SERIAL_COMMANDS_BUFFERED), 0),
380        }
381    }
382    fn force_push(&mut self, eof_inside: bool, data: &mut AllocatedMemoryRange<u8, AllocU8>, pm: Option<&mut PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>>) -> DivansOpResult {
383        if self.min_buffer_push_len * 2 < self.buffer.max_len(){
384            self.min_buffer_push_len <<= 2;
385        }
386        loop {
387            let _elapsed = unguarded_debug_time!(self.worker);
388            let &(ref lock, ref cvar) = &*self.worker.queue;
389            let mut worker = lock.lock().unwrap();
390            let mut did_notify = false;
391            if data.0.len() != 0 { // before we get to sending commands, lets make sure data is taken care of
392                match worker.push_consumed_data(data, None) {
393                    DivansOutputResult::Success => {
394                        thread_debug!(ThreadEventType::W_PUSH_CONSUMED_DATA, data.0.len() as u32, self.worker, _elapsed);
395                    },
396                    DivansOutputResult::NeedsMoreOutput => {
397                        thread_debug!(ThreadEventType::W_WAIT_PUSH_CONSUMED_DATA, data.0.len(), self.worker, _elapsed);
398                        worker.waiters += 1;
399                        let _ign = cvar.wait(worker);
400                        _ign.unwrap().waiters -= 1;
401                        continue;
402                    }
403                    DivansOutputResult::Failure(e) => {
404                        worker.set_error(e);
405                    }
406                }
407                if worker.waiters != 0 && !did_notify{
408                    cvar.notify_one();
409                    did_notify = true;
410                }
411            }
412            if worker.result_multi_space_ready(self.buffer.1 as usize) {
413                thread_debug!(ThreadEventType::W_PUSH_CMD, self.buffer.1, self.worker, _elapsed);
414                if eof_inside {
415                    worker.set_eof_hint(); // so other side gets more aggressive about pulling
416                }
417                if worker.waiters != 0 && !did_notify{
418                    cvar.notify_one();
419                }
420                let extant_space = worker.insert_results(&mut self.buffer, pm);
421                if extant_space <= 16 {
422                    self.min_buffer_push_len = core::cmp::max(self.min_buffer_push_len >> 1, 4);
423                }
424                self.buffer.1 = 0;
425                return DivansOpResult::Success;
426            } else if worker.err.is_none() {
427                thread_debug!(ThreadEventType::W_WAIT_PUSH_CMD, self.buffer.1, self.worker, _elapsed);
428                worker.waiters += 1;
429                let _ign = cvar.wait(worker);
430                _ign.unwrap().waiters -= 1;
431            } else {
432                return DivansOpResult::Failure(worker.err.unwrap());
433            }
434        }
435    }
436    pub fn free(&mut self, m8: &mut RepurposingAlloc<u8, AllocU8>, mc: &mut AllocCommand) {
437        mc.free_cell(core::mem::replace(&mut self.buffer.0,
438                                        AllocCommand::AllocatedMemory::default()));
439        self.worker.free(m8, mc);
440    }
441}
442impl<AllocU8:Allocator<u8>, AllocCommand:Allocator<StaticCommand>> ThreadToMain<AllocU8> for BufferedMultiWorker<AllocU8, AllocCommand> {
443    const COOPERATIVE:bool = false;
444    const ISOLATED:bool = true;
445    #[inline(always)]
446    fn pull_data(&mut self) -> ThreadData<AllocU8> {
447        self.worker.pull_data()
448    }
449    #[inline(always)]
450    fn pull_context_map(&mut self,
451                        m8: Option<&mut RepurposingAlloc<u8, AllocU8>>) -> Result<PredictionModeContextMap<AllocatedMemoryPrefix<u8, AllocU8>>, ()> {
452        self.worker.pull_context_map(m8)
453    }
454    #[inline(always)]
455    fn push_cmd<Specialization:EncoderOrDecoderRecoderSpecialization>(
456        &mut self,
457        cmd:&mut Command<AllocatedMemoryPrefix<u8, AllocU8>>,
458        _m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
459        _recoder: Option<&mut DivansRecodeState<AllocU8::AllocatedMemory>>,
460        _specialization: &mut Specialization,
461        _output:&mut [u8],
462        _output_offset: &mut usize,
463    ) -> DivansOutputResult {
464        let (static_command, pm) = downcast_command(cmd);
465        self.buffer.0.slice_mut()[self.buffer.1 as usize] = static_command;
466        self.buffer.1 += 1;
467        if pm.is_some() {
468            DivansOutputResult::from(self.force_push(false, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), pm))
469        } else if self.buffer.1 as usize == self.buffer.0.len() || self.buffer.1 as usize == self.min_buffer_push_len {
470            DivansOutputResult::from(self.force_push(false, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), None))
471        } else {
472            //FIXME: why does this case not do anything
473            DivansOutputResult::Success
474        }
475    }
476    #[inline(always)]
477    fn push_consumed_data(&mut self,
478                    data:&mut AllocatedMemoryRange<u8, AllocU8>,
479                    _m8: Option<&mut RepurposingAlloc<u8, AllocU8>>,
480    ) -> DivansOutputResult {
481        DivansOutputResult::from(self.force_push(false, data, None))
482    }
483   #[inline(always)]
484    fn push_eof(&mut self,
485    ) -> DivansOutputResult {
486        DivansOutputResult::from(self.force_push(true, &mut AllocatedMemoryRange::<u8, AllocU8>::default(), None))
487    }
488   #[inline(always)]
489    fn broadcast_err(&mut self, err: ErrMsg) {
490        self.worker.broadcast_err_internal(err, ThreadEventType::W_BROADCAST_ERR)
491    }
492}