pub struct Cache { /* private fields */ }

Implementations§

Examples found in repository?
src/cache.rs (line 56)
55
56
57
    fn default() -> Self {
        Self::new()
    }
Examples found in repository?
src/service/stream.rs (line 75)
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
pub async fn handle_stream(
    mut stream: Streaming<DiscoveryRequest>,
    tx: mpsc::Sender<Result<DiscoveryResponse, Status>>,
    type_url: &str,
    cache: Arc<Cache>,
) {
    let mut nonce: i64 = 0;
    let mut known_resource_names: HashMap<String, HashSet<String>> = HashMap::new();
    let (watches_tx, mut watches_rx) = mpsc::channel(16);
    let mut node = None;
    let mut last_responses: HashMap<String, LastResponse> = HashMap::new();
    let mut watches = Watches::new(cache.clone());

    loop {
        tokio::select! {
            request = stream.next() => {
                let mut req = request.unwrap().unwrap();
                info!("received request version={:?} type={:?} resources={:?} nonce={:?}",
                      req.version_info, &req.type_url[20..], req.resource_names, req.response_nonce);

                // Node might only be sent on the first request to save sending the same data
                // repeatedly, so let's cache it in memory for future requests on this stream.
                if req.node.is_some() {
                    node = req.node.clone();
                } else {
                    req.node = node.clone();
                }

                if type_url == ANY_TYPE && req.type_url.is_empty() {
                    // Type URL is required for ADS (ANY_TYPE) because we can't tell from just the
                    // gRPC method which resource this request is for.
                    error(tx, Status::invalid_argument("type URL is required for ADS")).await;
                    return;
                } else if req.type_url.is_empty() {
                    // Type URL is otherwise optional, but let's set it for consistency.
                    req.type_url = type_url.to_string();
                }

                // If this is an ack of a previous response, record that the client has received
                // the resource names for that response.
                if let Some(last_response) = last_responses.get(&req.type_url) {
                    if last_response.nonce == 0 || last_response.nonce == nonce {
                        let entry = known_resource_names.entry(req.type_url.clone());
                        entry
                            .and_modify(|entry| {
                                last_response.resource_names.iter().for_each(|name| {
                                    entry.insert(name.clone());
                                })
                            })
                            .or_insert_with(HashSet::new);
                    }
                }

                let mut watch_id = None;
                if let Some(watch) = watches.get(type_url) {
                    // A watch already exists so we need to replace it if this is a valid ack.
                    if watch.nonce.is_none() || watch.nonce == Some(nonce) {
                        cache.cancel_watch(&watch.id).await;
                        watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
                    }
                } else {
                    // No watch exists yet so we can just create one.
                    watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
                }
                if let Some(id) = watch_id {
                    watches.add(type_url, id);
                }
            }
            Some(mut rep) = watches_rx.recv() => {
                nonce += 1;
                rep.1.nonce = nonce.to_string();
                let last_response = LastResponse{
                    nonce,
                    resource_names: rep.0.resource_names,
                };
                last_responses.insert(rep.0.type_url.clone(), last_response);
                tx.send(Ok(rep.1)).await.unwrap();
                if let Some(watch) = watches.get_mut(&rep.0.type_url) {
                    watch.nonce = Some(nonce)
                }
            }
        }
    }
}
Examples found in repository?
src/service/watches.rs (line 44)
42
43
44
45
46
pub async fn cancel_all(active: HashMap<String, Watch>, cache: Arc<Cache>) {
    for (_, watch) in active.iter() {
        cache.cancel_watch(&watch.id).await;
    }
}
More examples
Hide additional examples
src/service/stream.rs (line 74)
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
pub async fn handle_stream(
    mut stream: Streaming<DiscoveryRequest>,
    tx: mpsc::Sender<Result<DiscoveryResponse, Status>>,
    type_url: &str,
    cache: Arc<Cache>,
) {
    let mut nonce: i64 = 0;
    let mut known_resource_names: HashMap<String, HashSet<String>> = HashMap::new();
    let (watches_tx, mut watches_rx) = mpsc::channel(16);
    let mut node = None;
    let mut last_responses: HashMap<String, LastResponse> = HashMap::new();
    let mut watches = Watches::new(cache.clone());

    loop {
        tokio::select! {
            request = stream.next() => {
                let mut req = request.unwrap().unwrap();
                info!("received request version={:?} type={:?} resources={:?} nonce={:?}",
                      req.version_info, &req.type_url[20..], req.resource_names, req.response_nonce);

                // Node might only be sent on the first request to save sending the same data
                // repeatedly, so let's cache it in memory for future requests on this stream.
                if req.node.is_some() {
                    node = req.node.clone();
                } else {
                    req.node = node.clone();
                }

                if type_url == ANY_TYPE && req.type_url.is_empty() {
                    // Type URL is required for ADS (ANY_TYPE) because we can't tell from just the
                    // gRPC method which resource this request is for.
                    error(tx, Status::invalid_argument("type URL is required for ADS")).await;
                    return;
                } else if req.type_url.is_empty() {
                    // Type URL is otherwise optional, but let's set it for consistency.
                    req.type_url = type_url.to_string();
                }

                // If this is an ack of a previous response, record that the client has received
                // the resource names for that response.
                if let Some(last_response) = last_responses.get(&req.type_url) {
                    if last_response.nonce == 0 || last_response.nonce == nonce {
                        let entry = known_resource_names.entry(req.type_url.clone());
                        entry
                            .and_modify(|entry| {
                                last_response.resource_names.iter().for_each(|name| {
                                    entry.insert(name.clone());
                                })
                            })
                            .or_insert_with(HashSet::new);
                    }
                }

                let mut watch_id = None;
                if let Some(watch) = watches.get(type_url) {
                    // A watch already exists so we need to replace it if this is a valid ack.
                    if watch.nonce.is_none() || watch.nonce == Some(nonce) {
                        cache.cancel_watch(&watch.id).await;
                        watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
                    }
                } else {
                    // No watch exists yet so we can just create one.
                    watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
                }
                if let Some(id) = watch_id {
                    watches.add(type_url, id);
                }
            }
            Some(mut rep) = watches_rx.recv() => {
                nonce += 1;
                rep.1.nonce = nonce.to_string();
                let last_response = LastResponse{
                    nonce,
                    resource_names: rep.0.resource_names,
                };
                last_responses.insert(rep.0.type_url.clone(), last_response);
                tx.send(Ok(rep.1)).await.unwrap();
                if let Some(watch) = watches.get_mut(&rep.0.type_url) {
                    watch.nonce = Some(nonce)
                }
            }
        }
    }
}
Examples found in repository?
src/service/common.rs (line 44)
39
40
41
42
43
44
45
46
47
48
49
50
51
    pub async fn fetch(
        &self,
        req: &DiscoveryRequest,
        type_url: &'static str,
    ) -> Result<Response<DiscoveryResponse>, Status> {
        match self.cache.fetch(req, type_url).await {
            Ok(resp) => Ok(Response::new(resp)),
            Err(FetchError::NotFound) => Err(Status::not_found("Resource not found for node")),
            Err(FetchError::VersionUpToDate) => {
                Err(Status::already_exists("Version already up to date"))
            }
        }
    }

Trait Implementations§

Formats the value using the given formatter. Read more
Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Wrap the input message T in a tonic::Request
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more