Struct rust_control_plane::cache::Cache
source · pub struct Cache { /* private fields */ }
Implementations§
source§impl Cache
impl Cache
sourcepub async fn create_watch(
&self,
req: &DiscoveryRequest,
tx: Sender<(DiscoveryRequest, DiscoveryResponse)>,
known_resource_names: &HashMap<String, HashSet<String>>
) -> Option<WatchId>
pub async fn create_watch(
&self,
req: &DiscoveryRequest,
tx: Sender<(DiscoveryRequest, DiscoveryResponse)>,
known_resource_names: &HashMap<String, HashSet<String>>
) -> Option<WatchId>
Examples found in repository?
src/service/stream.rs (line 75)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
pub async fn handle_stream(
mut stream: Streaming<DiscoveryRequest>,
tx: mpsc::Sender<Result<DiscoveryResponse, Status>>,
type_url: &str,
cache: Arc<Cache>,
) {
let mut nonce: i64 = 0;
let mut known_resource_names: HashMap<String, HashSet<String>> = HashMap::new();
let (watches_tx, mut watches_rx) = mpsc::channel(16);
let mut node = None;
let mut last_responses: HashMap<String, LastResponse> = HashMap::new();
let mut watches = Watches::new(cache.clone());
loop {
tokio::select! {
request = stream.next() => {
let mut req = request.unwrap().unwrap();
info!("received request version={:?} type={:?} resources={:?} nonce={:?}",
req.version_info, &req.type_url[20..], req.resource_names, req.response_nonce);
// Node might only be sent on the first request to save sending the same data
// repeatedly, so let's cache it in memory for future requests on this stream.
if req.node.is_some() {
node = req.node.clone();
} else {
req.node = node.clone();
}
if type_url == ANY_TYPE && req.type_url.is_empty() {
// Type URL is required for ADS (ANY_TYPE) because we can't tell from just the
// gRPC method which resource this request is for.
error(tx, Status::invalid_argument("type URL is required for ADS")).await;
return;
} else if req.type_url.is_empty() {
// Type URL is otherwise optional, but let's set it for consistency.
req.type_url = type_url.to_string();
}
// If this is an ack of a previous response, record that the client has received
// the resource names for that response.
if let Some(last_response) = last_responses.get(&req.type_url) {
if last_response.nonce == 0 || last_response.nonce == nonce {
let entry = known_resource_names.entry(req.type_url.clone());
entry
.and_modify(|entry| {
last_response.resource_names.iter().for_each(|name| {
entry.insert(name.clone());
})
})
.or_insert_with(HashSet::new);
}
}
let mut watch_id = None;
if let Some(watch) = watches.get(type_url) {
// A watch already exists so we need to replace it if this is a valid ack.
if watch.nonce.is_none() || watch.nonce == Some(nonce) {
cache.cancel_watch(&watch.id).await;
watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
}
} else {
// No watch exists yet so we can just create one.
watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
}
if let Some(id) = watch_id {
watches.add(type_url, id);
}
}
Some(mut rep) = watches_rx.recv() => {
nonce += 1;
rep.1.nonce = nonce.to_string();
let last_response = LastResponse{
nonce,
resource_names: rep.0.resource_names,
};
last_responses.insert(rep.0.type_url.clone(), last_response);
tx.send(Ok(rep.1)).await.unwrap();
if let Some(watch) = watches.get_mut(&rep.0.type_url) {
watch.nonce = Some(nonce)
}
}
}
}
}
sourcepub async fn cancel_watch(&self, watch_id: &WatchId)
pub async fn cancel_watch(&self, watch_id: &WatchId)
Examples found in repository?
More examples
src/service/stream.rs (line 74)
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
pub async fn handle_stream(
mut stream: Streaming<DiscoveryRequest>,
tx: mpsc::Sender<Result<DiscoveryResponse, Status>>,
type_url: &str,
cache: Arc<Cache>,
) {
let mut nonce: i64 = 0;
let mut known_resource_names: HashMap<String, HashSet<String>> = HashMap::new();
let (watches_tx, mut watches_rx) = mpsc::channel(16);
let mut node = None;
let mut last_responses: HashMap<String, LastResponse> = HashMap::new();
let mut watches = Watches::new(cache.clone());
loop {
tokio::select! {
request = stream.next() => {
let mut req = request.unwrap().unwrap();
info!("received request version={:?} type={:?} resources={:?} nonce={:?}",
req.version_info, &req.type_url[20..], req.resource_names, req.response_nonce);
// Node might only be sent on the first request to save sending the same data
// repeatedly, so let's cache it in memory for future requests on this stream.
if req.node.is_some() {
node = req.node.clone();
} else {
req.node = node.clone();
}
if type_url == ANY_TYPE && req.type_url.is_empty() {
// Type URL is required for ADS (ANY_TYPE) because we can't tell from just the
// gRPC method which resource this request is for.
error(tx, Status::invalid_argument("type URL is required for ADS")).await;
return;
} else if req.type_url.is_empty() {
// Type URL is otherwise optional, but let's set it for consistency.
req.type_url = type_url.to_string();
}
// If this is an ack of a previous response, record that the client has received
// the resource names for that response.
if let Some(last_response) = last_responses.get(&req.type_url) {
if last_response.nonce == 0 || last_response.nonce == nonce {
let entry = known_resource_names.entry(req.type_url.clone());
entry
.and_modify(|entry| {
last_response.resource_names.iter().for_each(|name| {
entry.insert(name.clone());
})
})
.or_insert_with(HashSet::new);
}
}
let mut watch_id = None;
if let Some(watch) = watches.get(type_url) {
// A watch already exists so we need to replace it if this is a valid ack.
if watch.nonce.is_none() || watch.nonce == Some(nonce) {
cache.cancel_watch(&watch.id).await;
watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
}
} else {
// No watch exists yet so we can just create one.
watch_id = cache.create_watch(&req, watches_tx.clone(), &known_resource_names).await;
}
if let Some(id) = watch_id {
watches.add(type_url, id);
}
}
Some(mut rep) = watches_rx.recv() => {
nonce += 1;
rep.1.nonce = nonce.to_string();
let last_response = LastResponse{
nonce,
resource_names: rep.0.resource_names,
};
last_responses.insert(rep.0.type_url.clone(), last_response);
tx.send(Ok(rep.1)).await.unwrap();
if let Some(watch) = watches.get_mut(&rep.0.type_url) {
watch.nonce = Some(nonce)
}
}
}
}
}
pub async fn set_snapshot(&self, node: &str, snapshot: Snapshot)
sourcepub async fn fetch<'a>(
&'a self,
req: &'a DiscoveryRequest,
type_url: &'static str
) -> Result<DiscoveryResponse, FetchError>
pub async fn fetch<'a>(
&'a self,
req: &'a DiscoveryRequest,
type_url: &'static str
) -> Result<DiscoveryResponse, FetchError>
Examples found in repository?
src/service/common.rs (line 44)
39 40 41 42 43 44 45 46 47 48 49 50 51
pub async fn fetch(
&self,
req: &DiscoveryRequest,
type_url: &'static str,
) -> Result<Response<DiscoveryResponse>, Status> {
match self.cache.fetch(req, type_url).await {
Ok(resp) => Ok(Response::new(resp)),
Err(FetchError::NotFound) => Err(Status::not_found("Resource not found for node")),
Err(FetchError::VersionUpToDate) => {
Err(Status::already_exists("Version already up to date"))
}
}
}
pub async fn node_status(&self) -> HashMap<String, Instant>
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for Cache
impl Send for Cache
impl Sync for Cache
impl Unpin for Cache
impl !UnwindSafe for Cache
Blanket Implementations§
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request