1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use cyfs_base::*;
use crate::{
    stack::{Stack}, 
    ndn::{*, channel::{*, protocol::v0::*}, chunk::{ChunkStreamCache, RawCache}}
};


pub async fn start_upload_task(
    stack: &Stack, 
    interest: &Interest, 
    to: &Channel, 
    owners: Vec<String>, 
) -> BuckyResult<Box<dyn UploadTask>> {
    let desc = interest.prefer_type.fill_values(&interest.chunk);
    let cache = stack.ndn().chunk_manager().create_cache(&interest.chunk);
    let encoder = cache.create_encoder(&desc);
   
    
    let session = to.upload(
        interest.chunk.clone(), 
        interest.session_id.clone(), 
        desc.clone(), 
        encoder)?;
    
    let _ = stack.ndn().root_task().upload().add_task(owners, &session)?;
  
    Ok(session.clone_as_task())
}

pub async fn start_upload_task_from_cache<T: RawCache + 'static>(
    stack: &Stack, 
    interest: &Interest, 
    to: &Channel, 
    owners: Vec<String>, 
    cache: T
) -> BuckyResult<Box<dyn UploadTask>> {
    let desc = interest.prefer_type.fill_values(&interest.chunk);

    let stream_cache = ChunkStreamCache::new(&interest.chunk);
    stream_cache.load(true, Box::new(cache))?;
    let encoder = stream_cache.create_encoder(&desc);

     
    let session = to.upload(
        interest.chunk.clone(), 
        interest.session_id.clone(), 
        desc.clone(), 
        encoder)?;
    
    let _ = stack.ndn().root_task().upload().add_task(owners, &session)?;
  
    Ok(session.clone_as_task())
}


// 需要通知到stack层次的内部事件在这里统一实现;这里的代码属于策略,异变或者可以通过配置扩展
#[derive(Clone)]
pub struct DefaultNdnEventHandler {
   
}

impl DefaultNdnEventHandler {
    pub fn new() -> Self {
        Self {
           
        }
    }

}

#[async_trait::async_trait]
impl NdnEventHandler for DefaultNdnEventHandler {
    fn on_unknown_piece_data(
        &self, 
        _stack: &Stack, 
        _piece: &PieceData, 
        _from: &Channel
    ) -> BuckyResult<DownloadSession> {
        Err(BuckyError::new(BuckyErrorCode::Interrupted, "no session downloading"))
    }

    // 处理全新的interest请求;已经正在上传的interest请求不会传递到这里;
    async fn on_newly_interest(
        &self, 
        stack: &Stack, 
        interest: &Interest, 
        from: &Channel
    ) -> BuckyResult<()> {
        
        let _ = start_upload_task(
            stack, 
            interest, 
            &from, 
            vec![],
        ).await?;

        Ok(())
    }
}