1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
use std; use hyper; use tokio_core; use futures; use futures::Sink; use futures::Future; type ChunkResult = Result<hyper::Chunk, hyper::Error>; type ChunkSender = futures::sync::mpsc::Sender<ChunkResult>; pub type ChunkReceiver = futures::sync::mpsc::Receiver<ChunkResult>; pub struct StreamProvider { pub remote: tokio_core::reactor::Remote, pub tx: ChunkSender } impl StreamProvider { pub fn new(ev_loop: &tokio_core::reactor::Remote) -> (StreamProvider, ChunkReceiver) { let (tx, rx) = futures::sync::mpsc::channel(64); (StreamProvider { remote: ev_loop.clone(), tx: tx }, rx) } pub fn into_boxed(self) -> Box<StreamProvider> { Box::new(self) } pub unsafe fn from_raw_boxed(raw: *mut StreamProvider) -> Box<StreamProvider> { Box::from_raw(raw) } pub fn send_chunk(&mut self, chunk: &[u8]) { let chunk = hyper::Chunk::from(chunk.to_vec()); let tx = self.tx.clone(); self.remote.spawn(move |_| { tx.send(Ok(chunk)).map_err(|_| ()).map(|_| ()) }); } pub fn close(self) { let remote = self.remote; let mut tx = self.tx; remote.spawn(move |_| { tx.close().map_err(|_| ()).map(|_| ()) }); } } #[no_mangle] pub unsafe fn ice_core_stream_provider_send_chunk(sp: *mut StreamProvider, data: *const u8, len: u32) { let sp = &mut *sp; sp.send_chunk(std::slice::from_raw_parts(data, len as usize)); } #[no_mangle] pub unsafe fn ice_core_destroy_stream_provider(sp: *mut StreamProvider) { let sp = StreamProvider::from_raw_boxed(sp); sp.close(); }