Struct rust_control_plane::snapshot::Snapshot
source · Fields§
§resources: HashMap<String, Resources>
Implementations§
source§impl Snapshot
impl Snapshot
pub fn insert(&mut self, type_url: String, resources: Resources)
sourcepub fn version(&self, type_url: &str) -> &str
pub fn version(&self, type_url: &str) -> &str
Examples found in repository?
src/cache.rs (line 79)
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
pub async fn create_watch(
&self,
req: &DiscoveryRequest,
tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
known_resource_names: &HashMap<String, HashSet<String>>,
) -> Option<WatchId> {
let mut inner = self.inner.lock().await;
let node_id = hash_id(&req.node);
inner.update_node_status(&node_id);
if let Some(snapshot) = inner.snapshots.get(&node_id) {
let resources = snapshot.resources(&req.type_url);
let version = snapshot.version(&req.type_url);
let type_known_resource_names = known_resource_names.get(&req.type_url);
// Check if a different set of resources has been requested.
if inner.is_requesting_new_resources(req, resources, type_known_resource_names) {
info!("responding: resource diff");
respond(req, tx, resources, version).await;
return None;
}
if req.version_info == version {
// Client is already at the latest version, so we have nothing to respond with.
// Set a watch because we may receive a new version in the future.
info!("set watch: latest version");
Some(inner.set_watch(&node_id, req, tx))
} else {
// The version has changed, so we should respond.
info!("responding: new version");
respond(req, tx, resources, version).await;
None
}
} else {
// No snapshot exists for this node, so we have nothing to respond with.
// Set a watch because we may receive a snapshot for this node in the future.
info!("set watch: no snapshot");
Some(inner.set_watch(&node_id, req, tx))
}
}
// Deletes a watch previously created with create_watch.
pub async fn cancel_watch(&self, watch_id: &WatchId) {
let mut inner = self.inner.lock().await;
if let Some(status) = inner.status.get_mut(&watch_id.node_id) {
status.watches.remove(watch_id.index);
}
}
// Updates snapshot associated with a given node so that future requests receive it.
// Triggers existing watches for the given node.
pub async fn set_snapshot(&self, node: &str, snapshot: Snapshot) {
let mut inner = self.inner.lock().await;
inner.snapshots.insert(node.to_string(), snapshot.clone());
if let Some(status) = inner.status.get_mut(node) {
let mut to_delete = Vec::new();
for (watch_id, watch) in &mut status.watches {
let version = snapshot.version(&watch.req.type_url);
if version != watch.req.version_info {
to_delete.push(watch_id)
}
}
for watch_id in to_delete {
let watch = status.watches.remove(watch_id);
let resources = snapshot.resources(&watch.req.type_url);
let version = snapshot.version(&watch.req.type_url);
info!("watch triggered version={}", version);
respond(&watch.req, watch.tx, resources, version).await;
}
}
}
pub async fn fetch<'a>(
&'a self,
req: &'a DiscoveryRequest,
type_url: &'static str,
) -> Result<DiscoveryResponse, FetchError> {
let inner = self.inner.lock().await;
let node_id = hash_id(&req.node);
let snapshot = inner.snapshots.get(&node_id).ok_or(FetchError::NotFound)?;
let version = snapshot.version(&req.type_url);
if req.version_info == version {
return Err(FetchError::VersionUpToDate);
}
let resources = snapshot.resources(type_url);
Ok(build_response(req, resources, version))
}
sourcepub fn resources(&self, type_url: &str) -> Option<&Resources>
pub fn resources(&self, type_url: &str) -> Option<&Resources>
Examples found in repository?
src/cache.rs (line 78)
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
pub async fn create_watch(
&self,
req: &DiscoveryRequest,
tx: mpsc::Sender<(DiscoveryRequest, DiscoveryResponse)>,
known_resource_names: &HashMap<String, HashSet<String>>,
) -> Option<WatchId> {
let mut inner = self.inner.lock().await;
let node_id = hash_id(&req.node);
inner.update_node_status(&node_id);
if let Some(snapshot) = inner.snapshots.get(&node_id) {
let resources = snapshot.resources(&req.type_url);
let version = snapshot.version(&req.type_url);
let type_known_resource_names = known_resource_names.get(&req.type_url);
// Check if a different set of resources has been requested.
if inner.is_requesting_new_resources(req, resources, type_known_resource_names) {
info!("responding: resource diff");
respond(req, tx, resources, version).await;
return None;
}
if req.version_info == version {
// Client is already at the latest version, so we have nothing to respond with.
// Set a watch because we may receive a new version in the future.
info!("set watch: latest version");
Some(inner.set_watch(&node_id, req, tx))
} else {
// The version has changed, so we should respond.
info!("responding: new version");
respond(req, tx, resources, version).await;
None
}
} else {
// No snapshot exists for this node, so we have nothing to respond with.
// Set a watch because we may receive a snapshot for this node in the future.
info!("set watch: no snapshot");
Some(inner.set_watch(&node_id, req, tx))
}
}
// Deletes a watch previously created with create_watch.
pub async fn cancel_watch(&self, watch_id: &WatchId) {
let mut inner = self.inner.lock().await;
if let Some(status) = inner.status.get_mut(&watch_id.node_id) {
status.watches.remove(watch_id.index);
}
}
// Updates snapshot associated with a given node so that future requests receive it.
// Triggers existing watches for the given node.
pub async fn set_snapshot(&self, node: &str, snapshot: Snapshot) {
let mut inner = self.inner.lock().await;
inner.snapshots.insert(node.to_string(), snapshot.clone());
if let Some(status) = inner.status.get_mut(node) {
let mut to_delete = Vec::new();
for (watch_id, watch) in &mut status.watches {
let version = snapshot.version(&watch.req.type_url);
if version != watch.req.version_info {
to_delete.push(watch_id)
}
}
for watch_id in to_delete {
let watch = status.watches.remove(watch_id);
let resources = snapshot.resources(&watch.req.type_url);
let version = snapshot.version(&watch.req.type_url);
info!("watch triggered version={}", version);
respond(&watch.req, watch.tx, resources, version).await;
}
}
}
pub async fn fetch<'a>(
&'a self,
req: &'a DiscoveryRequest,
type_url: &'static str,
) -> Result<DiscoveryResponse, FetchError> {
let inner = self.inner.lock().await;
let node_id = hash_id(&req.node);
let snapshot = inner.snapshots.get(&node_id).ok_or(FetchError::NotFound)?;
let version = snapshot.version(&req.type_url);
if req.version_info == version {
return Err(FetchError::VersionUpToDate);
}
let resources = snapshot.resources(type_url);
Ok(build_response(req, resources, version))
}
Trait Implementations§
Auto Trait Implementations§
impl RefUnwindSafe for Snapshot
impl Send for Snapshot
impl Sync for Snapshot
impl Unpin for Snapshot
impl UnwindSafe for Snapshot
Blanket Implementations§
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request