swh_graph_grpc_server/
lib.rs

1// Copyright (C) 2023-2024  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::collections::HashMap;
7use std::io::Read;
8use std::path::Path;
9use std::str::FromStr;
10use std::sync::Arc;
11
12use cadence::StatsdClient;
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::transport::Server;
15use tonic::{Request, Response};
16use tonic_middleware::MiddlewareFor;
17use tracing::{instrument, Level};
18
19use swh_graph::properties::NodeIdFromSwhidError;
20use swh_graph::views::Subgraph;
21
22pub mod proto {
23    tonic::include_proto!("swh.graph");
24
25    pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
26        tonic::include_file_descriptor_set!("swhgraph_descriptor");
27}
28
29use proto::traversal_service_server::TraversalServiceServer;
30
31mod filters;
32mod find_path;
33pub mod graph;
34pub mod metrics;
35mod node_builder;
36#[cfg(feature = "sentry")]
37pub mod sentry;
38pub mod statsd;
39mod traversal;
40mod utils;
41pub mod visitor;
42
43use graph::SwhOptFullGraph;
44
45/// Runs a long-running function in a separate thread so it does not block.
46///
47/// This differs from [`tokio::task::spawn_blocking`] in that the closure does not
48/// need to be `'static` (so it can borrow from its scope)
49pub(crate) fn scoped_spawn_blocking<R: Send + Sync + 'static, F: FnOnce() -> R + Send>(f: F) -> R {
50    let ((), mut outputs): ((), Vec<Result<R, tokio::task::JoinError>>) =
51        async_scoped::TokioScope::scope_and_block(|scope| scope.spawn_blocking(f));
52    assert_eq!(outputs.len(), 1, "Unexpected number of futures spawned");
53    outputs.pop().unwrap().expect("could not join task")
54}
55
56type TonicResult<T> = Result<tonic::Response<T>, tonic::Status>;
57
58pub struct TraversalService<G: SwhOptFullGraph + Clone + Send + Sync + 'static> {
59    graph: G,
60    pub statsd_client: Option<Arc<StatsdClient>>,
61}
62
63impl<G: SwhOptFullGraph + Clone + Send + Sync + 'static> TraversalService<G> {
64    pub fn new(graph: G, statsd_client: Option<Arc<StatsdClient>>) -> Self {
65        TraversalService {
66            graph,
67            statsd_client,
68        }
69    }
70}
71
72pub trait TraversalServiceTrait {
73    type Graph: SwhOptFullGraph + Clone + Send + Sync + 'static;
74    #[allow(clippy::result_large_err)] // implementation is short enough to be inlined
75    fn try_get_node_id(&self, swhid: &str) -> Result<usize, tonic::Status>;
76    fn graph(&self) -> &Self::Graph;
77    fn statsd_client(&self) -> Option<&Arc<StatsdClient>>;
78}
79
80impl<G: SwhOptFullGraph + Clone + Send + Sync + 'static> TraversalServiceTrait
81    for TraversalService<G>
82{
83    type Graph = G;
84
85    #[inline(always)]
86    fn try_get_node_id(&self, swhid: &str) -> Result<usize, tonic::Status> {
87        let node = self
88            .graph
89            .properties()
90            .node_id_from_string_swhid(swhid)
91            .map_err(|e| match e {
92                NodeIdFromSwhidError::InvalidSwhid(e) => {
93                    tonic::Status::invalid_argument(format!("Invalid SWHID: {e}"))
94                }
95                NodeIdFromSwhidError::UnknownSwhid(e) => {
96                    tonic::Status::not_found(format!("Unknown SWHID: {e}"))
97                }
98                NodeIdFromSwhidError::InternalError(e) => {
99                    tonic::Status::internal(format!("Internal error: {e}"))
100                }
101            })?;
102
103        if self.graph.has_node(node) {
104            Ok(node)
105        } else {
106            Err(tonic::Status::not_found(format!(
107                "Unavailable node: {swhid}"
108            )))
109        }
110    }
111
112    #[inline(always)]
113    fn graph(&self) -> &Self::Graph {
114        &self.graph
115    }
116
117    #[inline(always)]
118    fn statsd_client(&self) -> Option<&Arc<StatsdClient>> {
119        self.statsd_client.as_ref()
120    }
121}
122
123#[tonic::async_trait]
124impl<G: SwhOptFullGraph + Send + Sync + Clone + 'static>
125    proto::traversal_service_server::TraversalService for TraversalService<G>
126{
127    async fn get_node(&self, request: Request<proto::GetNodeRequest>) -> TonicResult<proto::Node> {
128        let arc_checker = filters::ArcFilterChecker::new(self.graph.clone(), None)?;
129        let subgraph = Arc::new(Subgraph::with_arc_filter(
130            self.graph.clone(),
131            move |src, dst| arc_checker.matches(src, dst),
132        ));
133        let proto::GetNodeRequest { swhid, mask } = request.get_ref().clone();
134        let node_builder = node_builder::NodeBuilder::new(
135            subgraph,
136            mask.map(|mask| prost_types::FieldMask {
137                paths: mask.paths.iter().map(|field| field.to_owned()).collect(),
138            }),
139        )?;
140        let node_id = self.try_get_node_id(&swhid)?;
141        Ok(Response::new(node_builder.build_node(node_id)))
142    }
143
144    type TraverseStream = ReceiverStream<Result<proto::Node, tonic::Status>>;
145    #[instrument(skip(self, request), err(level = Level::INFO))]
146    async fn traverse(
147        &self,
148        request: Request<proto::TraversalRequest>,
149    ) -> TonicResult<Self::TraverseStream> {
150        tracing::info!("{:?}", request.get_ref());
151        traversal::SimpleTraversal { service: self }
152            .traverse(request)
153            .await
154    }
155
156    #[instrument(skip(self, request), err(level = Level::INFO))]
157    async fn find_path_to(
158        &self,
159        request: Request<proto::FindPathToRequest>,
160    ) -> TonicResult<proto::Path> {
161        tracing::info!("{:?}", request.get_ref());
162        find_path::FindPath { service: self }
163            .find_path_to(request)
164            .await
165    }
166
167    #[instrument(skip(self, request), err(level = Level::INFO))]
168    async fn find_path_between(
169        &self,
170        request: Request<proto::FindPathBetweenRequest>,
171    ) -> TonicResult<proto::Path> {
172        tracing::info!("{:?}", request.get_ref());
173        find_path::FindPath { service: self }
174            .find_path_between(request)
175            .await
176    }
177
178    #[instrument(skip(self, request), err(level = Level::INFO))]
179    async fn count_nodes(
180        &self,
181        request: Request<proto::TraversalRequest>,
182    ) -> TonicResult<proto::CountResponse> {
183        tracing::info!("{:?}", request.get_ref());
184        traversal::SimpleTraversal { service: self }
185            .count_nodes(request)
186            .await
187    }
188
189    #[instrument(skip(self, request), err(level = Level::INFO))]
190    async fn count_edges(
191        &self,
192        request: Request<proto::TraversalRequest>,
193    ) -> TonicResult<proto::CountResponse> {
194        tracing::info!("{:?}", request.get_ref());
195        traversal::SimpleTraversal { service: self }
196            .count_edges(request)
197            .await
198    }
199
200    #[instrument(skip(self, request))]
201    async fn stats(
202        &self,
203        request: Request<proto::StatsRequest>,
204    ) -> TonicResult<proto::StatsResponse> {
205        tracing::info!("{:?}", request.get_ref());
206        // Load properties
207        let properties_path = self.graph.path().with_extension("properties");
208        let properties_path = properties_path.as_path();
209        let properties = load_properties(properties_path, ".stats")?;
210
211        // Load stats
212        let stats_path = self.graph.path().with_extension("stats");
213        let stats_path = stats_path.as_path();
214        let stats = load_properties(stats_path, ".stats")?;
215
216        // Load export metadata
217        let export_meta_path = self
218            .graph
219            .path()
220            .parent()
221            .ok_or_else(|| {
222                log::error!(
223                    "Could not get path to meta/export.json from {}",
224                    self.graph.path().display()
225                );
226                tonic::Status::internal("Could not find meta/export.json file")
227            })?
228            .join("meta")
229            .join("export.json");
230        let export_meta_path = export_meta_path.as_path();
231        let export_meta = load_export_meta(export_meta_path);
232        let export_meta = export_meta.as_ref();
233
234        Ok(Response::new(proto::StatsResponse {
235            num_nodes: self.graph.num_nodes() as i64,
236            num_edges: self.graph.num_arcs() as i64,
237            compression_ratio: get_property(&properties, properties_path, "compratio").ok(),
238            bits_per_node: get_property(&properties, properties_path, "bitspernode").ok(),
239            bits_per_edge: get_property(&properties, properties_path, "bitsperlink").ok(),
240            avg_locality: get_property(&stats, stats_path, "avglocality").ok(),
241            indegree_min: get_property(&stats, stats_path, "minindegree")?,
242            indegree_max: get_property(&stats, stats_path, "maxindegree")?,
243            indegree_avg: get_property(&stats, stats_path, "avgindegree")?,
244            outdegree_min: get_property(&stats, stats_path, "minoutdegree")?,
245            outdegree_max: get_property(&stats, stats_path, "maxoutdegree")?,
246            outdegree_avg: get_property(&stats, stats_path, "avgoutdegree")?,
247            export_started_at: export_meta.map(|export_meta| export_meta.export_start.timestamp()),
248            export_ended_at: export_meta.map(|export_meta| export_meta.export_end.timestamp()),
249            num_nodes_by_type: self
250                .graph
251                .num_nodes_by_type()
252                .unwrap_or_else(|e| {
253                    log::info!("Missing num_nodes_by_type: {}", e);
254                    HashMap::new()
255                })
256                .into_iter()
257                .map(|(type_, count)| {
258                    (
259                        format!("{type_}"),
260                        i64::try_from(count).expect("Node count overflowed i64"),
261                    )
262                })
263                .collect(),
264            num_arcs_by_type: self
265                .graph
266                .num_arcs_by_type()
267                .unwrap_or_else(|e| {
268                    log::info!("Missing num_arcs_by_type: {}", e);
269                    HashMap::new()
270                })
271                .into_iter()
272                .map(|((src_type, dst_type), count)| {
273                    (
274                        format!("{src_type}:{dst_type}"),
275                        i64::try_from(count).expect("Arc count overflowed i64"),
276                    )
277                })
278                .collect(),
279        }))
280    }
281}
282
283#[derive(serde_derive::Deserialize)]
284struct ExportMeta {
285    export_start: chrono::DateTime<chrono::Utc>,
286    export_end: chrono::DateTime<chrono::Utc>,
287}
288fn load_export_meta(path: &Path) -> Option<ExportMeta> {
289    let file = std::fs::File::open(path)
290        .map_err(|e| {
291            log::error!("Could not open {}: {}", path.display(), e);
292        })
293        .ok()?;
294    let mut export_meta = String::new();
295    std::io::BufReader::new(file)
296        .read_to_string(&mut export_meta)
297        .map_err(|e| {
298            log::error!("Could not read {}: {}", path.display(), e);
299        })
300        .ok()?;
301    let export_meta = serde_json::from_str(&export_meta)
302        .map_err(|e| {
303            log::error!("Could not parse {}: {}", path.display(), e);
304        })
305        .ok()?;
306    Some(export_meta)
307}
308
309#[inline(always)]
310#[allow(clippy::result_large_err)] // it's inlined
311fn load_properties(path: &Path, suffix: &str) -> Result<HashMap<String, String>, tonic::Status> {
312    let file = std::fs::File::open(path).map_err(|e| {
313        log::error!("Could not open {}: {}", path.display(), e);
314        tonic::Status::internal(format!("Could not open {suffix} file"))
315    })?;
316    let properties = java_properties::read(std::io::BufReader::new(file)).map_err(|e| {
317        log::error!("Could not parse {}: {}", path.display(), e);
318        tonic::Status::internal(format!("Could not parse {suffix} file"))
319    })?;
320    Ok(properties)
321}
322
323#[inline(always)]
324#[allow(clippy::result_large_err)] // it's inlined
325fn get_property<V: FromStr>(
326    properties: &HashMap<String, String>,
327    properties_path: &Path,
328    name: &str,
329) -> Result<V, tonic::Status>
330where
331    <V as FromStr>::Err: std::fmt::Display,
332{
333    properties
334        .get(name)
335        .ok_or_else(|| {
336            log::error!("Missing {} in {}", name, properties_path.display());
337            tonic::Status::internal(format!("Could not read {name} from .properties"))
338        })?
339        .parse()
340        .map_err(|e| {
341            log::error!(
342                "Could not parse {} from {}",
343                name,
344                properties_path.display()
345            );
346            tonic::Status::internal(format!("Could not parse {name} from .properties: {e}"))
347        })
348}
349
350pub async fn serve<G: SwhOptFullGraph + Sync + Send + 'static>(
351    graph: G,
352    bind_addr: std::net::SocketAddr,
353    statsd_client: cadence::StatsdClient,
354) -> Result<(), tonic::transport::Error> {
355    let statsd_client = Arc::new(statsd_client);
356    let graph = Arc::new(graph);
357
358    let (health_reporter, health_service) = tonic_health::server::health_reporter();
359    health_reporter
360        .set_serving::<TraversalServiceServer<TraversalService<Arc<G>>>>()
361        .await;
362
363    #[cfg(not(feature = "sentry"))]
364    let mut builder = Server::builder();
365    #[cfg(feature = "sentry")]
366    let mut builder =
367        Server::builder().layer(::sentry::integrations::tower::NewSentryLayer::new_from_top());
368    builder
369        .add_service(MiddlewareFor::new(
370            TraversalServiceServer::new(TraversalService::new(graph, Some(statsd_client.clone()))),
371            metrics::MetricsMiddleware::new(statsd_client),
372        ))
373        .add_service(health_service)
374        .add_service(
375            tonic_reflection::server::Builder::configure()
376                .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
377                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
378                .build_v1()
379                .expect("Could not load v1 reflection service"),
380        )
381        .add_service(
382            tonic_reflection::server::Builder::configure()
383                .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
384                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
385                .build_v1alpha()
386                .expect("Could not load v1alpha reflection service"),
387        )
388        .serve(bind_addr)
389        .await?;
390
391    Ok(())
392}