1use std::cell::RefCell;
20use std::fmt;
21use std::thread_local;
22
23use futures::stream;
24use futures::stream::StreamExt;
25use futures::stream::TryStreamExt;
26
27use crate::id::VertexName;
28use crate::iddag::FirstAncestorConstraint;
29use crate::iddag::IdDag;
30use crate::iddagstore::IdDagStore;
31use crate::ops::IdConvert;
32use crate::Group;
33use crate::Id;
34#[cfg(any(test, feature = "indexedlog-backend"))]
35use crate::IdMap;
36use crate::IdSet;
37use crate::Result;
38
39#[derive(Debug, Clone)]
44pub struct RequestNameToLocation {
45 pub names: Vec<VertexName>,
46 pub heads: Vec<VertexName>,
47}
48
49#[derive(Debug, Clone)]
52pub struct RequestLocationToName {
53 pub paths: Vec<AncestorPath>,
54}
55
56#[derive(Debug, Clone)]
58pub struct ResponseIdNamePair {
59 pub path_names: Vec<(AncestorPath, Vec<VertexName>)>,
65}
66
67#[derive(Clone)]
72pub struct AncestorPath {
73 pub x: VertexName,
74
75 pub n: u64,
76
77 pub batch_size: u64,
79}
80
81impl fmt::Display for AncestorPath {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 write!(f, "{:?}~{}", self.x, self.n)
84 }
85}
86
87impl fmt::Debug for AncestorPath {
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 write!(f, "{}", self)?;
90 if self.batch_size != 1 {
91 write!(f, "(+{})", self.batch_size)?;
92 }
93 Ok(())
94 }
95}
96
97#[async_trait::async_trait]
101pub trait RemoteIdConvertProtocol: Send + Sync + 'static {
102 async fn resolve_names_to_relative_paths(
112 &self,
113 heads: Vec<VertexName>,
114 names: Vec<VertexName>,
115 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>>;
116
117 async fn resolve_relative_paths_to_names(
122 &self,
123 paths: Vec<AncestorPath>,
124 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>>;
125
126 fn is_local(&self) -> bool {
129 false
130 }
131}
132
133#[async_trait::async_trait]
134impl RemoteIdConvertProtocol for () {
135 async fn resolve_names_to_relative_paths(
136 &self,
137 _heads: Vec<VertexName>,
138 _names: Vec<VertexName>,
139 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
140 Ok(Default::default())
141 }
142
143 async fn resolve_relative_paths_to_names(
144 &self,
145 paths: Vec<AncestorPath>,
146 ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
147 let msg = format!(
148 "Asked to resolve {:?} in graph but remote protocol is not configured",
149 paths
150 );
151 crate::errors::programming(msg)
152 }
153
154 fn is_local(&self) -> bool {
155 true
156 }
157}
158
159#[async_trait::async_trait]
168pub(crate) trait Process<I, O> {
169 async fn process(self, input: I) -> Result<O>;
170}
171
172#[async_trait::async_trait]
177impl<M: IdConvert, DagStore: IdDagStore> Process<Vec<VertexName>, RequestNameToLocation>
178 for (&M, &IdDag<DagStore>)
179{
180 async fn process(self, names: Vec<VertexName>) -> Result<RequestNameToLocation> {
181 let map = &self.0;
182 let dag = &self.1;
183 let heads = stream::iter(dag.heads_ancestors(dag.master_group()?)?.into_iter()).boxed();
186 let heads = heads
187 .then(|id| map.vertex_name(id))
188 .try_collect::<Vec<VertexName>>()
189 .await
190 .map_err(|e| {
191 let msg = format!(
192 concat!(
193 "Cannot resolve heads in master group to vertex name. ",
194 "The vertex name is required for remote vertex resolution. ",
195 "This probably indicates the Dag update logic does not ensure the ",
196 "vertex name of heads exist as it should. ",
197 "(Error: {})",
198 ),
199 e
200 );
201 crate::Error::Programming(msg)
202 })?;
203 Ok(RequestNameToLocation { names, heads })
204 }
205}
206
207#[async_trait::async_trait]
210impl<M: IdConvert, DagStore: IdDagStore> Process<IdSet, RequestLocationToName>
211 for (&M, &IdDag<DagStore>)
212{
213 async fn process(self, ids: IdSet) -> Result<RequestLocationToName> {
214 let map = &self.0;
215 let dag = &self.1;
216 let heads = dag.heads_ancestors(dag.master_group()?)?;
217
218 let mut id_path: Vec<(Id, u64, u64)> = Vec::with_capacity(ids.as_spans().len());
219 let mut last_id_opt = None;
220 for id in ids.into_iter() {
221 if let Some(last_id) = last_id_opt {
222 if dag.try_first_ancestor_nth(last_id, 1)? == Some(id) {
223 if let Some(last) = id_path.last_mut() {
225 last.2 += 1;
226 last_id_opt = Some(id);
227 continue;
228 }
229 }
230 }
231 let (x, n) = dag
232 .to_first_ancestor_nth(
233 id,
234 FirstAncestorConstraint::KnownUniversally {
235 heads: heads.clone(),
236 },
237 )?
238 .ok_or_else(|| {
239 if id.group() == Group::MASTER {
240 let msg = format!(
241 concat!(
242 "Cannot convert {} to x~n form using heads {:?}. ",
243 "This is unexpected. It indicates some serious bugs in graph ",
244 "calculation or the graph is corrupted (ex. has cycles).",
245 ),
246 id, &heads,
247 );
248 crate::Error::Bug(msg)
249 } else {
250 let msg = format!(
251 concat!(
252 "Cannot convert {} to x~n form. This is unexpected for non-master ",
253 "vertexes since they are expected to be non-lazy.",
254 ),
255 id
256 );
257 crate::Error::Programming(msg)
258 }
259 })?;
260 id_path.push((x, n, 1));
261 last_id_opt = Some(id);
262 }
263
264 let paths = stream::iter(id_path)
265 .then(|(x, n, batch_size)| async move {
266 let x = map.vertex_name(x).await.map_err(|e| {
267 let msg = format!(
268 concat!(
269 "Cannot resolve {} in to vertex name (Error: {}). ",
270 "The vertex name is required for remote vertex resolution. ",
271 "This probably indicates the Dag clone or update logic does ",
272 "not maintain \"universally known\" vertexes as it should.",
273 ),
274 x, e,
275 );
276 crate::Error::Programming(msg)
277 })?;
278 Ok::<_, crate::Error>(AncestorPath { x, n, batch_size })
279 })
280 .try_collect::<Vec<_>>()
281 .await?;
282
283 Ok(RequestLocationToName { paths })
284 }
285}
286
287#[async_trait::async_trait]
290impl<M: IdConvert, DagStore: IdDagStore> Process<RequestNameToLocation, ResponseIdNamePair>
291 for (&M, &IdDag<DagStore>)
292{
293 async fn process(self, request: RequestNameToLocation) -> Result<ResponseIdNamePair> {
294 let map = &self.0;
295 let dag = &self.1;
296
297 let heads: IdSet = {
298 let heads = stream::iter(request.heads.into_iter());
299 let heads = heads
300 .then(|s| map.vertex_id(s))
301 .try_collect::<Vec<Id>>()
302 .await?;
303 IdSet::from_spans(heads)
304 };
305 let resolvable = dag.ancestors(heads.clone())?;
306
307 let id_names: Vec<(Id, VertexName)> = {
308 let ids_result = map.vertex_id_batch(&request.names).await?;
309 let mut id_names = Vec::with_capacity(ids_result.len());
310 for (name, id_result) in request.names.into_iter().zip(ids_result) {
311 match id_result {
312 Err(crate::Error::VertexNotFound(n)) => {
314 tracing::trace!(
315 "RequestNameToLocation -> ResponseIdNamePair: skip unknown name {:?}",
316 &n
317 );
318 continue;
319 }
320 Err(e) => {
321 return Err(e);
322 }
323 Ok(id) => {
324 if resolvable.contains(id) {
325 id_names.push((id, name))
326 }
327 }
328 }
329 }
330 id_names
331 };
332
333 let path_names: Vec<(AncestorPath, Vec<VertexName>)> = {
334 let x_n_names: Vec<(Id, u64, VertexName)> = id_names
335 .into_iter()
336 .filter_map(|(id, name)| {
337 match dag.to_first_ancestor_nth(
338 id,
339 FirstAncestorConstraint::KnownUniversally {
340 heads: heads.clone(),
341 },
342 ) {
343 Err(e) => Some(Err(e)),
344 Ok(None) => None,
346 Ok(Some((x, n))) => Some(Ok((x, n, name))),
347 }
348 })
349 .collect::<Result<Vec<_>>>()?;
350
351 stream::iter(x_n_names)
353 .then(|(x, n, name)| async move {
354 let x = map.vertex_name(x).await?;
355 Ok::<_, crate::Error>((
356 AncestorPath {
357 x,
358 n,
359 batch_size: 1,
360 },
361 vec![name],
362 ))
363 })
364 .try_collect()
365 .await?
366 };
367
368 Ok(ResponseIdNamePair { path_names })
369 }
370}
371
372#[async_trait::async_trait]
375impl<M: IdConvert, DagStore: IdDagStore> Process<RequestLocationToName, ResponseIdNamePair>
376 for (&M, &IdDag<DagStore>)
377{
378 async fn process(self, request: RequestLocationToName) -> Result<ResponseIdNamePair> {
379 let map = &self.0;
380 let dag = &self.1;
381
382 let path_names: Vec<(AncestorPath, Vec<VertexName>)> =
383 stream::iter(request.paths.into_iter())
384 .then(|path| async move {
385 let id = map.vertex_id(path.x.clone()).await?;
386 let mut id = dag.first_ancestor_nth(id, path.n)?;
387 let mut ids = Vec::with_capacity(path.batch_size as _);
388 for i in 0..path.batch_size {
389 if i > 0 {
390 id = dag.first_ancestor_nth(id, 1)?;
391 }
392 ids.push(id);
393 }
394 let fallible_names = map.vertex_name_batch(&ids).await?;
395 let mut names = Vec::with_capacity(fallible_names.len());
396 for name in fallible_names {
397 names.push(name?);
398 }
399 debug_assert_eq!(path.batch_size, names.len() as u64);
400 Ok::<_, crate::Error>((path, names))
401 })
402 .try_collect()
403 .await?;
404 Ok(ResponseIdNamePair { path_names })
405 }
406}
407
408#[cfg(any(test, feature = "indexedlog-backend"))]
411#[async_trait::async_trait]
412impl<'a, DagStore: IdDagStore> Process<ResponseIdNamePair, ()>
413 for (&'a mut IdMap, &'a IdDag<DagStore>)
414{
415 async fn process(mut self, res: ResponseIdNamePair) -> Result<()> {
416 use crate::errors::NotFoundError;
417
418 let map = &mut self.0;
419 let dag = &self.1;
420 for (path, names) in res.path_names.iter() {
421 let x: Id = map
422 .find_id_by_name(path.x.as_ref())?
423 .ok_or_else(|| path.x.not_found_error())?;
424 let mut id = dag.first_ancestor_nth(x, path.n)?;
425 tracing::trace!("insert path {:?} names {:?} (x = {})", &path, &names, id);
426 for (i, name) in names.iter().enumerate() {
427 if i > 0 {
428 id = dag.first_ancestor_nth(id, 1)?;
429 }
430 tracing::trace!(" insert {:?} = {:?}", id, &name);
431 map.insert(id, name.as_ref())?;
432 }
433 }
434 Ok(())
435 }
436}
437
438thread_local! {
444 static NON_BLOCKING_DEPTH: RefCell<usize> = RefCell::new(0);
445}
446
447pub(crate) fn disable_remote_protocol<F, R>(f: F) -> R
449where
450 F: FnOnce() -> R,
451{
452 NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() += 1);
453 let result = f();
454 NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() -= 1);
455 result
456}
457
458pub(crate) fn is_remote_protocol_disabled() -> bool {
459 NON_BLOCKING_DEPTH.with(|v| *v.borrow() != 0)
460}