1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std;
use hyper;
use tokio_core;
use futures;
use futures::Sink;
use futures::Future;

type ChunkResult = Result<hyper::Chunk, hyper::Error>;
type ChunkSender = futures::sync::mpsc::Sender<ChunkResult>;
pub type ChunkReceiver = futures::sync::mpsc::Receiver<ChunkResult>;

pub struct StreamProvider {
    pub remote: tokio_core::reactor::Remote,
    pub tx: ChunkSender
}

impl StreamProvider {
    pub fn new(ev_loop: &tokio_core::reactor::Remote) -> (StreamProvider, ChunkReceiver) {
        let (tx, rx) = futures::sync::mpsc::channel(64);

        (StreamProvider {
            remote: ev_loop.clone(),
            tx: tx
        }, rx)
    }

    pub fn into_boxed(self) -> Box<StreamProvider> {
        Box::new(self)
    }

    pub unsafe fn from_raw_boxed(raw: *mut StreamProvider) -> Box<StreamProvider> {
        Box::from_raw(raw)
    }

    pub fn send_chunk(&mut self, chunk: &[u8]) {
        let chunk = hyper::Chunk::from(chunk.to_vec());
        let tx = self.tx.clone();

        self.remote.spawn(move |_| {
            tx.send(Ok(chunk)).map_err(|_| ()).map(|_| ())
        });
    }

    pub fn close(self) {
        let remote = self.remote;
        let mut tx = self.tx;

        remote.spawn(move |_| {
            tx.close().map_err(|_| ()).map(|_| ())
        });
    }
}

#[no_mangle]
pub unsafe fn ice_core_stream_provider_send_chunk(sp: *mut StreamProvider, data: *const u8, len: u32) {
    let sp = &mut *sp;
    sp.send_chunk(std::slice::from_raw_parts(data, len as usize));
}

#[no_mangle]
pub unsafe fn ice_core_destroy_stream_provider(sp: *mut StreamProvider) {
    let sp = StreamProvider::from_raw_boxed(sp);
    sp.close();
}