dag/
protocol.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! # protocol
9//!
10//! Abstractions used for communication between `(sparse_idmap1, segments1)`
11//! (usually, a client) and `(complete_idmap2, segments2)` (usually, a server).
12//!
13//! When the sparse idmap gets asked to convert unknown id or name, it goes
14//! through the following flow to find the answer:
15//!
16//! - Id -> Name: Id -> RequestLocationToName -> ResponseIdNamePair -> Name
17//! - Name -> Id: Name -> RequestNameToLocation -> ResponseIdNamePair -> Id
18
19use std::cell::RefCell;
20use std::fmt;
21use std::thread_local;
22
23use futures::stream;
24use futures::stream::StreamExt;
25use futures::stream::TryStreamExt;
26
27use crate::id::VertexName;
28use crate::iddag::FirstAncestorConstraint;
29use crate::iddag::IdDag;
30use crate::iddagstore::IdDagStore;
31use crate::ops::IdConvert;
32use crate::Group;
33use crate::Id;
34#[cfg(any(test, feature = "indexedlog-backend"))]
35use crate::IdMap;
36use crate::IdSet;
37use crate::Result;
38
39// Request and Response structures -------------------------------------------
40
41/// Request for locating names (commit hashes) in a IdDag.
42/// Useful for converting names to ids.
43#[derive(Debug, Clone)]
44pub struct RequestNameToLocation {
45    pub names: Vec<VertexName>,
46    pub heads: Vec<VertexName>,
47}
48
49/// Request for converting locations to names (commit hashes).
50/// Useful for converting ids to names.
51#[derive(Debug, Clone)]
52pub struct RequestLocationToName {
53    pub paths: Vec<AncestorPath>,
54}
55
56/// Response for converting names to ids or converting names to ids.
57#[derive(Debug, Clone)]
58pub struct ResponseIdNamePair {
59    // For converting Id -> Name, the client provides AncestorPath, the server provides
60    // Vec<Box<[u8]>>.
61    //
62    // For converting Name -> Id, the client provides Box<[u8]>, the server provides
63    // AncestorPath.
64    pub path_names: Vec<(AncestorPath, Vec<VertexName>)>,
65}
66
67/// The `n`-th first ancestor of `x`. `x~n` in hg revset syntax.
68/// Usually, `x` is commonly known by the client and the server.
69///
70/// This can be seen as a kind of "location".
71#[derive(Clone)]
72pub struct AncestorPath {
73    pub x: VertexName,
74
75    pub n: u64,
76
77    // Starting from x~n, get a chain of commits following p1.
78    pub batch_size: u64,
79}
80
81impl fmt::Display for AncestorPath {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        write!(f, "{:?}~{}", self.x, self.n)
84    }
85}
86
87impl fmt::Debug for AncestorPath {
88    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89        write!(f, "{}", self)?;
90        if self.batch_size != 1 {
91            write!(f, "(+{})", self.batch_size)?;
92        }
93        Ok(())
94    }
95}
96
97// Async Remote Protocols ----------------------------------------------------
98
99/// Abstraction of network protocols.
100#[async_trait::async_trait]
101pub trait RemoteIdConvertProtocol: Send + Sync + 'static {
102    /// Ask the server to convert names to "x~n" relative paths.
103    ///
104    /// If a "name" cannot be resolved using "x~n" form in "::heads", aka. the
105    /// "heads" are known to the server, and the server can calculate "::heads",
106    /// and knows all names (commit hashes) in "::heads". And the server
107    /// confirms "name" is outside "::heads" (either because "name" is unknown
108    /// to the server's IdMap, or because "name" is known in the server's IdMap,
109    /// but the matching Id is outside "::heads"), this method should skip it in
110    /// the resulting list (instead of returning an error).
111    async fn resolve_names_to_relative_paths(
112        &self,
113        heads: Vec<VertexName>,
114        names: Vec<VertexName>,
115    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>>;
116
117    /// Ask the server to convert "x~n" relative paths back to commit hashes.
118    ///
119    /// Unlike resolve_names_to_relative_paths, failures are not expected.
120    /// They usually indicate rare events like master moving backwards.
121    async fn resolve_relative_paths_to_names(
122        &self,
123        paths: Vec<AncestorPath>,
124    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>>;
125
126    /// Return `true` if the protocol is local and queries do not need to
127    /// optimize for batching or latency.
128    fn is_local(&self) -> bool {
129        false
130    }
131}
132
133#[async_trait::async_trait]
134impl RemoteIdConvertProtocol for () {
135    async fn resolve_names_to_relative_paths(
136        &self,
137        _heads: Vec<VertexName>,
138        _names: Vec<VertexName>,
139    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
140        Ok(Default::default())
141    }
142
143    async fn resolve_relative_paths_to_names(
144        &self,
145        paths: Vec<AncestorPath>,
146    ) -> Result<Vec<(AncestorPath, Vec<VertexName>)>> {
147        let msg = format!(
148            "Asked to resolve {:?} in graph but remote protocol is not configured",
149            paths
150        );
151        crate::errors::programming(msg)
152    }
153
154    fn is_local(&self) -> bool {
155        true
156    }
157}
158
159// Traits --------------------------------------------------------------------
160
161/// Similar to `From::from(I) -> O`, but with `self` as context.
162///
163/// Example use-cases:
164/// - Convert a query to a request (client-side).
165/// - Convert a request to a response (server-side).
166/// - Handle a response from the server (client-side).
167#[async_trait::async_trait]
168pub(crate) trait Process<I, O> {
169    async fn process(self, input: I) -> Result<O>;
170}
171
172// Basic implementation ------------------------------------------------------
173
174// Name -> Id, step 1: Name -> RequestNameToLocation
175// Works on an incomplete IdMap, client-side.
176#[async_trait::async_trait]
177impl<M: IdConvert, DagStore: IdDagStore> Process<Vec<VertexName>, RequestNameToLocation>
178    for (&M, &IdDag<DagStore>)
179{
180    async fn process(self, names: Vec<VertexName>) -> Result<RequestNameToLocation> {
181        let map = &self.0;
182        let dag = &self.1;
183        // Only provides heads in the master group, since it's expected that the
184        // non-master group is already locally known.
185        let heads = stream::iter(dag.heads_ancestors(dag.master_group()?)?.into_iter()).boxed();
186        let heads = heads
187            .then(|id| map.vertex_name(id))
188            .try_collect::<Vec<VertexName>>()
189            .await
190            .map_err(|e| {
191                let msg = format!(
192                    concat!(
193                        "Cannot resolve heads in master group to vertex name. ",
194                        "The vertex name is required for remote vertex resolution. ",
195                        "This probably indicates the Dag update logic does not ensure the ",
196                        "vertex name of heads exist as it should. ",
197                        "(Error: {})",
198                    ),
199                    e
200                );
201                crate::Error::Programming(msg)
202            })?;
203        Ok(RequestNameToLocation { names, heads })
204    }
205}
206
207// Id -> Name, step 1: Id -> RequestLocationToName
208// Works on an incomplete IdMap, client-side.
209#[async_trait::async_trait]
210impl<M: IdConvert, DagStore: IdDagStore> Process<IdSet, RequestLocationToName>
211    for (&M, &IdDag<DagStore>)
212{
213    async fn process(self, ids: IdSet) -> Result<RequestLocationToName> {
214        let map = &self.0;
215        let dag = &self.1;
216        let heads = dag.heads_ancestors(dag.master_group()?)?;
217
218        let mut id_path: Vec<(Id, u64, u64)> = Vec::with_capacity(ids.as_spans().len());
219        let mut last_id_opt = None;
220        for id in ids.into_iter() {
221            if let Some(last_id) = last_id_opt {
222                if dag.try_first_ancestor_nth(last_id, 1)? == Some(id) {
223                    // Reuse the last path.
224                    if let Some(last) = id_path.last_mut() {
225                        last.2 += 1;
226                        last_id_opt = Some(id);
227                        continue;
228                    }
229                }
230            }
231            let (x, n) = dag
232                .to_first_ancestor_nth(
233                    id,
234                    FirstAncestorConstraint::KnownUniversally {
235                        heads: heads.clone(),
236                    },
237                )?
238                .ok_or_else(|| {
239                    if id.group() == Group::MASTER {
240                        let msg = format!(
241                            concat!(
242                                "Cannot convert {} to x~n form using heads {:?}. ",
243                                "This is unexpected. It indicates some serious bugs in graph ",
244                                "calculation or the graph is corrupted (ex. has cycles).",
245                            ),
246                            id, &heads,
247                        );
248                        crate::Error::Bug(msg)
249                    } else {
250                        let msg = format!(
251                            concat!(
252                                "Cannot convert {} to x~n form. This is unexpected for non-master ",
253                                "vertexes since they are expected to be non-lazy.",
254                            ),
255                            id
256                        );
257                        crate::Error::Programming(msg)
258                    }
259                })?;
260            id_path.push((x, n, 1));
261            last_id_opt = Some(id);
262        }
263
264        let paths = stream::iter(id_path)
265            .then(|(x, n, batch_size)| async move {
266                let x = map.vertex_name(x).await.map_err(|e| {
267                    let msg = format!(
268                        concat!(
269                            "Cannot resolve {} in to vertex name (Error: {}). ",
270                            "The vertex name is required for remote vertex resolution. ",
271                            "This probably indicates the Dag clone or update logic does ",
272                            "not maintain \"universally known\" vertexes as it should.",
273                        ),
274                        x, e,
275                    );
276                    crate::Error::Programming(msg)
277                })?;
278                Ok::<_, crate::Error>(AncestorPath { x, n, batch_size })
279            })
280            .try_collect::<Vec<_>>()
281            .await?;
282
283        Ok(RequestLocationToName { paths })
284    }
285}
286
287// Name -> Id, step 2: RequestNameToLocation -> ResponseIdNamePair
288// Works on a complete IdMap, server-side.
289#[async_trait::async_trait]
290impl<M: IdConvert, DagStore: IdDagStore> Process<RequestNameToLocation, ResponseIdNamePair>
291    for (&M, &IdDag<DagStore>)
292{
293    async fn process(self, request: RequestNameToLocation) -> Result<ResponseIdNamePair> {
294        let map = &self.0;
295        let dag = &self.1;
296
297        let heads: IdSet = {
298            let heads = stream::iter(request.heads.into_iter());
299            let heads = heads
300                .then(|s| map.vertex_id(s))
301                .try_collect::<Vec<Id>>()
302                .await?;
303            IdSet::from_spans(heads)
304        };
305        let resolvable = dag.ancestors(heads.clone())?;
306
307        let id_names: Vec<(Id, VertexName)> = {
308            let ids_result = map.vertex_id_batch(&request.names).await?;
309            let mut id_names = Vec::with_capacity(ids_result.len());
310            for (name, id_result) in request.names.into_iter().zip(ids_result) {
311                match id_result {
312                    // If one of the names cannot be resolved to id, just skip it.
313                    Err(crate::Error::VertexNotFound(n)) => {
314                        tracing::trace!(
315                            "RequestNameToLocation -> ResponseIdNamePair: skip unknown name {:?}",
316                            &n
317                        );
318                        continue;
319                    }
320                    Err(e) => {
321                        return Err(e);
322                    }
323                    Ok(id) => {
324                        if resolvable.contains(id) {
325                            id_names.push((id, name))
326                        }
327                    }
328                }
329            }
330            id_names
331        };
332
333        let path_names: Vec<(AncestorPath, Vec<VertexName>)> = {
334            let x_n_names: Vec<(Id, u64, VertexName)> = id_names
335                .into_iter()
336                .filter_map(|(id, name)| {
337                    match dag.to_first_ancestor_nth(
338                        id,
339                        FirstAncestorConstraint::KnownUniversally {
340                            heads: heads.clone(),
341                        },
342                    ) {
343                        Err(e) => Some(Err(e)),
344                        // Skip ids that cannot be translated.
345                        Ok(None) => None,
346                        Ok(Some((x, n))) => Some(Ok((x, n, name))),
347                    }
348                })
349                .collect::<Result<Vec<_>>>()?;
350
351            // Convert x from Id to VertexName.
352            stream::iter(x_n_names)
353                .then(|(x, n, name)| async move {
354                    let x = map.vertex_name(x).await?;
355                    Ok::<_, crate::Error>((
356                        AncestorPath {
357                            x,
358                            n,
359                            batch_size: 1,
360                        },
361                        vec![name],
362                    ))
363                })
364                .try_collect()
365                .await?
366        };
367
368        Ok(ResponseIdNamePair { path_names })
369    }
370}
371
372// Id -> Name, step 2: RequestLocationToName -> ResponseIdNamePair
373// Works on a complete IdMap, server-side.
374#[async_trait::async_trait]
375impl<M: IdConvert, DagStore: IdDagStore> Process<RequestLocationToName, ResponseIdNamePair>
376    for (&M, &IdDag<DagStore>)
377{
378    async fn process(self, request: RequestLocationToName) -> Result<ResponseIdNamePair> {
379        let map = &self.0;
380        let dag = &self.1;
381
382        let path_names: Vec<(AncestorPath, Vec<VertexName>)> =
383            stream::iter(request.paths.into_iter())
384                .then(|path| async move {
385                    let id = map.vertex_id(path.x.clone()).await?;
386                    let mut id = dag.first_ancestor_nth(id, path.n)?;
387                    let mut ids = Vec::with_capacity(path.batch_size as _);
388                    for i in 0..path.batch_size {
389                        if i > 0 {
390                            id = dag.first_ancestor_nth(id, 1)?;
391                        }
392                        ids.push(id);
393                    }
394                    let fallible_names = map.vertex_name_batch(&ids).await?;
395                    let mut names = Vec::with_capacity(fallible_names.len());
396                    for name in fallible_names {
397                        names.push(name?);
398                    }
399                    debug_assert_eq!(path.batch_size, names.len() as u64);
400                    Ok::<_, crate::Error>((path, names))
401                })
402                .try_collect()
403                .await?;
404        Ok(ResponseIdNamePair { path_names })
405    }
406}
407
408// Name -> Id or Id -> Name, step 3: Apply RequestNameToLocation to a local IdMap.
409// Works on an incomplete IdMap, client-side.
410#[cfg(any(test, feature = "indexedlog-backend"))]
411#[async_trait::async_trait]
412impl<'a, DagStore: IdDagStore> Process<ResponseIdNamePair, ()>
413    for (&'a mut IdMap, &'a IdDag<DagStore>)
414{
415    async fn process(mut self, res: ResponseIdNamePair) -> Result<()> {
416        use crate::errors::NotFoundError;
417
418        let map = &mut self.0;
419        let dag = &self.1;
420        for (path, names) in res.path_names.iter() {
421            let x: Id = map
422                .find_id_by_name(path.x.as_ref())?
423                .ok_or_else(|| path.x.not_found_error())?;
424            let mut id = dag.first_ancestor_nth(x, path.n)?;
425            tracing::trace!("insert path {:?} names {:?} (x = {})", &path, &names, id);
426            for (i, name) in names.iter().enumerate() {
427                if i > 0 {
428                    id = dag.first_ancestor_nth(id, 1)?;
429                }
430                tracing::trace!(" insert {:?} = {:?}", id, &name);
431                map.insert(id, name.as_ref())?;
432            }
433        }
434        Ok(())
435    }
436}
437
438// Disable remote protocol temporarily ---------------------------------------
439// This can be useful for Debug::fmt to disable remote fetching which might
440// panic (ex. calling tokio without tokio runtime) when executing futures
441// via nonblocking.
442
443thread_local! {
444    static NON_BLOCKING_DEPTH: RefCell<usize> = RefCell::new(0);
445}
446
447/// Check if the current future is running inside a "non-blocking" block.
448pub(crate) fn disable_remote_protocol<F, R>(f: F) -> R
449where
450    F: FnOnce() -> R,
451{
452    NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() += 1);
453    let result = f();
454    NON_BLOCKING_DEPTH.with(|v| *v.borrow_mut() -= 1);
455    result
456}
457
458pub(crate) fn is_remote_protocol_disabled() -> bool {
459    NON_BLOCKING_DEPTH.with(|v| *v.borrow() != 0)
460}