1use std::cell::RefCell;
20use std::fmt;
21use std::thread_local;
22
23use futures::stream;
24use futures::stream::StreamExt;
25use futures::stream::TryStreamExt;
26
27use crate::id::Vertex;
28use crate::iddag::FirstAncestorConstraint;
29use crate::iddag::IdDag;
30use crate::iddagstore::IdDagStore;
31use crate::ops::IdConvert;
32use crate::Group;
33use crate::Id;
34#[cfg(any(test, feature = "indexedlog-backend"))]
35use crate::IdMap;
36use crate::IdSet;
37use crate::Result;
38
39#[derive(Debug, Clone)]
44pub struct RequestNameToLocation {
45 pub names: Vec<Vertex>,
46 pub heads: Vec<Vertex>,
47}
48
49#[derive(Debug, Clone)]
52pub struct RequestLocationToName {
53 pub paths: Vec<AncestorPath>,
54}
55
56#[derive(Debug, Clone)]
58pub struct ResponseIdNamePair {
59 pub path_names: Vec<(AncestorPath, Vec<Vertex>)>,
65}
66
67#[derive(Clone)]
72pub struct AncestorPath {
73 pub x: Vertex,
74
75 pub n: u64,
76
77 pub batch_size: u64,
79}
80
81impl fmt::Display for AncestorPath {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 write!(f, "{:?}~{}", self.x, self.n)
84 }
85}
86
87impl fmt::Debug for AncestorPath {
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 write!(f, "{}", self)?;
90 if self.batch_size != 1 {
91 write!(f, "(+{})", self.batch_size)?;
92 }
93 Ok(())
94 }
95}
96
97#[async_trait::async_trait]
101pub trait RemoteIdConvertProtocol: Send + Sync + 'static {
102 async fn resolve_names_to_relative_paths(
112 &self,
113 heads: Vec<Vertex>,
114 names: Vec<Vertex>,
115 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>>;
116
117 async fn resolve_relative_paths_to_names(
122 &self,
123 paths: Vec<AncestorPath>,
124 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>>;
125
126 fn is_local(&self) -> bool {
129 false
130 }
131}
132
133#[async_trait::async_trait]
134impl RemoteIdConvertProtocol for () {
135 async fn resolve_names_to_relative_paths(
136 &self,
137 _heads: Vec<Vertex>,
138 _names: Vec<Vertex>,
139 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
140 Ok(Default::default())
141 }
142
143 async fn resolve_relative_paths_to_names(
144 &self,
145 paths: Vec<AncestorPath>,
146 ) -> Result<Vec<(AncestorPath, Vec<Vertex>)>> {
147 let msg = format!(
148 "Asked to resolve {:?} in graph but remote protocol is not configured",
149 paths
150 );
151 crate::errors::programming(msg)
152 }
153
154 fn is_local(&self) -> bool {
155 true
156 }
157}
158
159#[async_trait::async_trait]
168pub(crate) trait Process<I, O> {
169 async fn process(self, input: I) -> Result<O>;
170}
171
172#[async_trait::async_trait]
177impl<M: IdConvert, DagStore: IdDagStore> Process<Vec<Vertex>, RequestNameToLocation>
178 for (&M, &IdDag<DagStore>)
179{
180 async fn process(self, names: Vec<Vertex>) -> Result<RequestNameToLocation> {
181 let map = &self.0;
182 let dag = &self.1;
183 let heads = stream::iter(dag.heads_ancestors(dag.master_group()?)?).boxed();
186 let heads = heads
187 .then(|id| map.vertex_name(id))
188 .try_collect::<Vec<Vertex>>()
189 .await
190 .map_err(|e| {
191 let msg = format!(
192 concat!(
193 "Cannot resolve heads in master group to vertex name. ",
194 "The vertex name is required for remote vertex resolution. ",
195 "This probably indicates the Dag update logic does not ensure the ",
196 "vertex name of heads exist as it should. ",
197 "(Error: {})",
198 ),
199 e
200 );
201 crate::Error::Programming(msg)
202 })?;
203 Ok(RequestNameToLocation { names, heads })
204 }
205}
206
207#[async_trait::async_trait]
210impl<M: IdConvert, DagStore: IdDagStore> Process<IdSet, RequestLocationToName>
211 for (&M, &IdDag<DagStore>)
212{
213 async fn process(self, ids: IdSet) -> Result<RequestLocationToName> {
214 let map = &self.0;
215 let dag = &self.1;
216 let heads = dag.heads_ancestors(dag.master_group()?)?;
217
218 let mut id_path: Vec<(Id, u64, u64)> = Vec::with_capacity(ids.as_spans().len());
219 let mut last_id_opt = None;
220 for id in ids.into_iter() {
221 if let Some(last_id) = last_id_opt {
222 if dag.try_first_ancestor_nth(last_id, 1)? == Some(id) {
223 if let Some(last) = id_path.last_mut() {
225 last.2 += 1;
226 last_id_opt = Some(id);
227 continue;
228 }
229 }
230 }
231 let (x, n) = dag
232 .to_first_ancestor_nth(
233 id,
234 FirstAncestorConstraint::KnownUniversally {
235 heads: heads.clone(),
236 },
237 )?
238 .ok_or_else(|| {
239 if id.group() == Group::MASTER {
240 let msg = format!(
241 concat!(
242 "Cannot convert {} to x~n form using heads {:?}. ",
243 "This is unexpected. It indicates some serious bugs in graph ",
244 "calculation or the graph is corrupted (ex. has cycles).",
245 ),
246 id, &heads,
247 );
248 crate::Error::Bug(msg)
249 } else {
250 let msg = format!(
251 concat!(
252 "Cannot convert {} to x~n form. This is unexpected for non-master ",
253 "vertexes since they are expected to be non-lazy.",
254 ),
255 id
256 );
257 crate::Error::Programming(msg)
258 }
259 })?;
260 id_path.push((x, n, 1));
261 last_id_opt = Some(id);
262 }
263
264 let paths = stream::iter(id_path)
265 .then(|(x, n, batch_size)| async move {
266 let x = map.vertex_name(x).await.map_err(|e| {
267 let msg = format!(
268 concat!(
269 "Cannot resolve {} in to vertex name (Error: {}). ",
270 "The vertex name is required for remote vertex resolution. ",
271 "This probably indicates the Dag clone or update logic does ",
272 "not maintain \"universally known\" vertexes as it should.",
273 ),
274 x, e,
275 );
276 crate::Error::Programming(msg)
277 })?;
278 Ok::<_, crate::Error>(AncestorPath { x, n, batch_size })
279 })
280 .try_collect::<Vec<_>>()
281 .await?;
282
283 Ok(RequestLocationToName { paths })
284 }
285}
286
287#[async_trait::async_trait]
290impl<M: IdConvert, DagStore: IdDagStore> Process<RequestNameToLocation, ResponseIdNamePair>
291 for (&M, &IdDag<DagStore>)
292{
293 async fn process(self, request: RequestNameToLocation) -> Result<ResponseIdNamePair> {
294 let map = &self.0;
295 let dag = &self.1;
296
297 let heads: IdSet = {
298 let heads = stream::iter(request.heads);
299 let heads = heads
300 .then(|s| map.vertex_id(s))
301 .try_collect::<Vec<Id>>()
302 .await?;
303 IdSet::from_spans(heads)
304 };
305 let resolvable = dag.ancestors(heads.clone())?;
306
307 let id_names: Vec<(Id, Vertex)> = {
308 let ids_result = map.vertex_id_batch(&request.names).await?;
309 let mut id_names = Vec::with_capacity(ids_result.len());
310 for (name, id_result) in request.names.into_iter().zip(ids_result) {
311 match id_result {
312 Err(crate::Error::VertexNotFound(n)) => {
314 tracing::trace!(
315 "RequestNameToLocation -> ResponseIdNamePair: skip unknown name {:?}",
316 &n
317 );
318 continue;
319 }
320 Err(e) => {
321 return Err(e);
322 }
323 Ok(id) => {
324 if resolvable.contains(id) {
325 id_names.push((id, name))
326 }
327 }
328 }
329 }
330 id_names
331 };
332
333 let path_names: Vec<(AncestorPath, Vec<Vertex>)> = {
334 let x_n_names: Vec<(Id, u64, Vertex)> = id_names
335 .into_iter()
336 .filter_map(|(id, name)| {
337 match dag.to_first_ancestor_nth(
338 id,
339 FirstAncestorConstraint::KnownUniversally {
340 heads: heads.clone(),
341 },
342 ) {
343 Err(e) => Some(Err(e)),
344 Ok(None) => None,
346 Ok(Some((x, n))) => Some(Ok((x, n, name))),
347 }
348 })
349 .collect::<Result<Vec<_>>>()?;
350
351 stream::iter(x_n_names)
353 .then(|(x, n, name)| async move {
354 let x = map.vertex_name(x).await?;
355 Ok::<_, crate::Error>((
356 AncestorPath {
357 x,
358 n,
359 batch_size: 1,
360 },
361 vec![name],
362 ))
363 })
364 .try_collect()
365 .await?
366 };
367
368 Ok(ResponseIdNamePair { path_names })
369 }
370}
371
372#[async_trait::async_trait]
375impl<M: IdConvert, DagStore: IdDagStore> Process<RequestLocationToName, ResponseIdNamePair>
376 for (&M, &IdDag<DagStore>)
377{
378 async fn process(self, request: RequestLocationToName) -> Result<ResponseIdNamePair> {
379 let map = &self.0;
380 let dag = &self.1;
381
382 let path_names: Vec<(AncestorPath, Vec<Vertex>)> = stream::iter(request.paths.into_iter())
383 .then(|path| async move {
384 let id = map.vertex_id(path.x.clone()).await?;
385 let mut id = dag.first_ancestor_nth(id, path.n)?;
386 let mut ids = Vec::with_capacity(path.batch_size as _);
387 for i in 0..path.batch_size {
388 if i > 0 {
389 id = dag.first_ancestor_nth(id, 1)?;
390 }
391 ids.push(id);
392 }
393 let fallible_names = map.vertex_name_batch(&ids).await?;
394 let mut names = Vec::with_capacity(fallible_names.len());
395 for name in fallible_names {
396 names.push(name?);
397 }
398 debug_assert_eq!(path.batch_size, names.len() as u64);
399 Ok::<_, crate::Error>((path, names))
400 })
401 .try_collect()
402 .await?;
403 Ok(ResponseIdNamePair { path_names })
404 }
405}
406
407#[cfg(any(test, feature = "indexedlog-backend"))]
410#[async_trait::async_trait]
411impl<'a, DagStore: IdDagStore> Process<ResponseIdNamePair, ()>
412 for (&'a mut IdMap, &'a IdDag<DagStore>)
413{
414 async fn process(mut self, res: ResponseIdNamePair) -> Result<()> {
415 use crate::errors::NotFoundError;
416
417 let map = &mut self.0;
418 let dag = &self.1;
419 for (path, names) in res.path_names.iter() {
420 let x: Id = map
421 .find_id_by_name(path.x.as_ref())?
422 .ok_or_else(|| path.x.not_found_error())?;
423 let mut id = dag.first_ancestor_nth(x, path.n)?;
424 tracing::trace!("insert path {:?} names {:?} (x = {})", &path, &names, id);
425 for (i, name) in names.iter().enumerate() {
426 if i > 0 {
427 id = dag.first_ancestor_nth(id, 1)?;
428 }
429 tracing::trace!(" insert {:?} = {:?}", id, &name);
430 map.insert(id, name.as_ref())?;
431 }
432 }
433 Ok(())
434 }
435}
436
437thread_local! {
443 static NON_BLOCKING_DEPTH: RefCell<usize> = RefCell::new(0);
444}
445
446pub(crate) fn disable_remote_protocol<F, R>(f: F) -> R
448where
449 F: FnOnce() -> R,
450{
451 NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() += 1);
452 let result = f();
453 NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() -= 1);
454 result
455}
456
457pub(crate) fn is_remote_protocol_disabled() -> bool {
458 NON_BLOCKING_DEPTH.with(|v| *v.borrow() != 0)
459}