1use std::collections::HashMap;
7use std::io::Read;
8use std::path::Path;
9use std::str::FromStr;
10use std::sync::Arc;
11
12use cadence::StatsdClient;
13use tokio_stream::wrappers::ReceiverStream;
14use tonic::transport::Server;
15use tonic::{Request, Response};
16use tonic_middleware::MiddlewareFor;
17use tracing::{instrument, Level};
18
19use swh_graph::properties::NodeIdFromSwhidError;
20use swh_graph::views::Subgraph;
21
22pub mod proto {
23 tonic::include_proto!("swh.graph");
24
25 pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
26 tonic::include_file_descriptor_set!("swhgraph_descriptor");
27}
28
29use proto::traversal_service_server::TraversalServiceServer;
30
31mod filters;
32mod find_path;
33pub mod graph;
34pub mod metrics;
35mod node_builder;
36#[cfg(feature = "sentry")]
37pub mod sentry;
38pub mod statsd;
39mod traversal;
40mod utils;
41pub mod visitor;
42
43use graph::SwhOptFullGraph;
44
45pub(crate) fn scoped_spawn_blocking<R: Send + Sync + 'static, F: FnOnce() -> R + Send>(f: F) -> R {
50 let ((), mut outputs): ((), Vec<Result<R, tokio::task::JoinError>>) =
51 async_scoped::TokioScope::scope_and_block(|scope| scope.spawn_blocking(f));
52 assert_eq!(outputs.len(), 1, "Unexpected number of futures spawned");
53 outputs.pop().unwrap().expect("could not join task")
54}
55
56type TonicResult<T> = Result<tonic::Response<T>, tonic::Status>;
57
58pub struct TraversalService<G: SwhOptFullGraph + Clone + Send + Sync + 'static> {
59 graph: G,
60 pub statsd_client: Option<Arc<StatsdClient>>,
61}
62
63impl<G: SwhOptFullGraph + Clone + Send + Sync + 'static> TraversalService<G> {
64 pub fn new(graph: G, statsd_client: Option<Arc<StatsdClient>>) -> Self {
65 TraversalService {
66 graph,
67 statsd_client,
68 }
69 }
70}
71
72pub trait TraversalServiceTrait {
73 type Graph: SwhOptFullGraph + Clone + Send + Sync + 'static;
74 #[allow(clippy::result_large_err)] fn try_get_node_id(&self, swhid: &str) -> Result<usize, tonic::Status>;
76 fn graph(&self) -> &Self::Graph;
77 fn statsd_client(&self) -> Option<&Arc<StatsdClient>>;
78}
79
80impl<G: SwhOptFullGraph + Clone + Send + Sync + 'static> TraversalServiceTrait
81 for TraversalService<G>
82{
83 type Graph = G;
84
85 #[inline(always)]
86 fn try_get_node_id(&self, swhid: &str) -> Result<usize, tonic::Status> {
87 let node = self
88 .graph
89 .properties()
90 .node_id_from_string_swhid(swhid)
91 .map_err(|e| match e {
92 NodeIdFromSwhidError::InvalidSwhid(e) => {
93 tonic::Status::invalid_argument(format!("Invalid SWHID: {e}"))
94 }
95 NodeIdFromSwhidError::UnknownSwhid(e) => {
96 tonic::Status::not_found(format!("Unknown SWHID: {e}"))
97 }
98 NodeIdFromSwhidError::InternalError(e) => {
99 tonic::Status::internal(format!("Internal error: {e}"))
100 }
101 })?;
102
103 if self.graph.has_node(node) {
104 Ok(node)
105 } else {
106 Err(tonic::Status::not_found(format!(
107 "Unavailable node: {swhid}"
108 )))
109 }
110 }
111
112 #[inline(always)]
113 fn graph(&self) -> &Self::Graph {
114 &self.graph
115 }
116
117 #[inline(always)]
118 fn statsd_client(&self) -> Option<&Arc<StatsdClient>> {
119 self.statsd_client.as_ref()
120 }
121}
122
123#[tonic::async_trait]
124impl<G: SwhOptFullGraph + Send + Sync + Clone + 'static>
125 proto::traversal_service_server::TraversalService for TraversalService<G>
126{
127 async fn get_node(&self, request: Request<proto::GetNodeRequest>) -> TonicResult<proto::Node> {
128 let arc_checker = filters::ArcFilterChecker::new(self.graph.clone(), None)?;
129 let subgraph = Arc::new(Subgraph::with_arc_filter(
130 self.graph.clone(),
131 move |src, dst| arc_checker.matches(src, dst),
132 ));
133 let proto::GetNodeRequest { swhid, mask } = request.get_ref().clone();
134 let node_builder = node_builder::NodeBuilder::new(
135 subgraph,
136 mask.map(|mask| prost_types::FieldMask {
137 paths: mask.paths.iter().map(|field| field.to_owned()).collect(),
138 }),
139 )?;
140 let node_id = self.try_get_node_id(&swhid)?;
141 Ok(Response::new(node_builder.build_node(node_id)))
142 }
143
144 type TraverseStream = ReceiverStream<Result<proto::Node, tonic::Status>>;
145 #[instrument(skip(self, request), err(level = Level::INFO))]
146 async fn traverse(
147 &self,
148 request: Request<proto::TraversalRequest>,
149 ) -> TonicResult<Self::TraverseStream> {
150 tracing::info!("{:?}", request.get_ref());
151 traversal::SimpleTraversal { service: self }
152 .traverse(request)
153 .await
154 }
155
156 #[instrument(skip(self, request), err(level = Level::INFO))]
157 async fn find_path_to(
158 &self,
159 request: Request<proto::FindPathToRequest>,
160 ) -> TonicResult<proto::Path> {
161 tracing::info!("{:?}", request.get_ref());
162 find_path::FindPath { service: self }
163 .find_path_to(request)
164 .await
165 }
166
167 #[instrument(skip(self, request), err(level = Level::INFO))]
168 async fn find_path_between(
169 &self,
170 request: Request<proto::FindPathBetweenRequest>,
171 ) -> TonicResult<proto::Path> {
172 tracing::info!("{:?}", request.get_ref());
173 find_path::FindPath { service: self }
174 .find_path_between(request)
175 .await
176 }
177
178 #[instrument(skip(self, request), err(level = Level::INFO))]
179 async fn count_nodes(
180 &self,
181 request: Request<proto::TraversalRequest>,
182 ) -> TonicResult<proto::CountResponse> {
183 tracing::info!("{:?}", request.get_ref());
184 traversal::SimpleTraversal { service: self }
185 .count_nodes(request)
186 .await
187 }
188
189 #[instrument(skip(self, request), err(level = Level::INFO))]
190 async fn count_edges(
191 &self,
192 request: Request<proto::TraversalRequest>,
193 ) -> TonicResult<proto::CountResponse> {
194 tracing::info!("{:?}", request.get_ref());
195 traversal::SimpleTraversal { service: self }
196 .count_edges(request)
197 .await
198 }
199
200 #[instrument(skip(self, request))]
201 async fn stats(
202 &self,
203 request: Request<proto::StatsRequest>,
204 ) -> TonicResult<proto::StatsResponse> {
205 tracing::info!("{:?}", request.get_ref());
206 let properties_path = self.graph.path().with_extension("properties");
208 let properties_path = properties_path.as_path();
209 let properties = load_properties(properties_path, ".stats")?;
210
211 let stats_path = self.graph.path().with_extension("stats");
213 let stats_path = stats_path.as_path();
214 let stats = load_properties(stats_path, ".stats")?;
215
216 let export_meta_path = self
218 .graph
219 .path()
220 .parent()
221 .ok_or_else(|| {
222 log::error!(
223 "Could not get path to meta/export.json from {}",
224 self.graph.path().display()
225 );
226 tonic::Status::internal("Could not find meta/export.json file")
227 })?
228 .join("meta")
229 .join("export.json");
230 let export_meta_path = export_meta_path.as_path();
231 let export_meta = load_export_meta(export_meta_path);
232 let export_meta = export_meta.as_ref();
233
234 Ok(Response::new(proto::StatsResponse {
235 num_nodes: self.graph.num_nodes() as i64,
236 num_edges: self.graph.num_arcs() as i64,
237 compression_ratio: get_property(&properties, properties_path, "compratio").ok(),
238 bits_per_node: get_property(&properties, properties_path, "bitspernode").ok(),
239 bits_per_edge: get_property(&properties, properties_path, "bitsperlink").ok(),
240 avg_locality: get_property(&stats, stats_path, "avglocality").ok(),
241 indegree_min: get_property(&stats, stats_path, "minindegree")?,
242 indegree_max: get_property(&stats, stats_path, "maxindegree")?,
243 indegree_avg: get_property(&stats, stats_path, "avgindegree")?,
244 outdegree_min: get_property(&stats, stats_path, "minoutdegree")?,
245 outdegree_max: get_property(&stats, stats_path, "maxoutdegree")?,
246 outdegree_avg: get_property(&stats, stats_path, "avgoutdegree")?,
247 export_started_at: export_meta.map(|export_meta| export_meta.export_start.timestamp()),
248 export_ended_at: export_meta.map(|export_meta| export_meta.export_end.timestamp()),
249 num_nodes_by_type: self
250 .graph
251 .num_nodes_by_type()
252 .unwrap_or_else(|e| {
253 log::info!("Missing num_nodes_by_type: {}", e);
254 HashMap::new()
255 })
256 .into_iter()
257 .map(|(type_, count)| {
258 (
259 format!("{type_}"),
260 i64::try_from(count).expect("Node count overflowed i64"),
261 )
262 })
263 .collect(),
264 num_arcs_by_type: self
265 .graph
266 .num_arcs_by_type()
267 .unwrap_or_else(|e| {
268 log::info!("Missing num_arcs_by_type: {}", e);
269 HashMap::new()
270 })
271 .into_iter()
272 .map(|((src_type, dst_type), count)| {
273 (
274 format!("{src_type}:{dst_type}"),
275 i64::try_from(count).expect("Arc count overflowed i64"),
276 )
277 })
278 .collect(),
279 }))
280 }
281}
282
283#[derive(serde_derive::Deserialize)]
284struct ExportMeta {
285 export_start: chrono::DateTime<chrono::Utc>,
286 export_end: chrono::DateTime<chrono::Utc>,
287}
288fn load_export_meta(path: &Path) -> Option<ExportMeta> {
289 let file = std::fs::File::open(path)
290 .map_err(|e| {
291 log::error!("Could not open {}: {}", path.display(), e);
292 })
293 .ok()?;
294 let mut export_meta = String::new();
295 std::io::BufReader::new(file)
296 .read_to_string(&mut export_meta)
297 .map_err(|e| {
298 log::error!("Could not read {}: {}", path.display(), e);
299 })
300 .ok()?;
301 let export_meta = serde_json::from_str(&export_meta)
302 .map_err(|e| {
303 log::error!("Could not parse {}: {}", path.display(), e);
304 })
305 .ok()?;
306 Some(export_meta)
307}
308
309#[inline(always)]
310#[allow(clippy::result_large_err)] fn load_properties(path: &Path, suffix: &str) -> Result<HashMap<String, String>, tonic::Status> {
312 let file = std::fs::File::open(path).map_err(|e| {
313 log::error!("Could not open {}: {}", path.display(), e);
314 tonic::Status::internal(format!("Could not open {suffix} file"))
315 })?;
316 let properties = java_properties::read(std::io::BufReader::new(file)).map_err(|e| {
317 log::error!("Could not parse {}: {}", path.display(), e);
318 tonic::Status::internal(format!("Could not parse {suffix} file"))
319 })?;
320 Ok(properties)
321}
322
323#[inline(always)]
324#[allow(clippy::result_large_err)] fn get_property<V: FromStr>(
326 properties: &HashMap<String, String>,
327 properties_path: &Path,
328 name: &str,
329) -> Result<V, tonic::Status>
330where
331 <V as FromStr>::Err: std::fmt::Display,
332{
333 properties
334 .get(name)
335 .ok_or_else(|| {
336 log::error!("Missing {} in {}", name, properties_path.display());
337 tonic::Status::internal(format!("Could not read {name} from .properties"))
338 })?
339 .parse()
340 .map_err(|e| {
341 log::error!(
342 "Could not parse {} from {}",
343 name,
344 properties_path.display()
345 );
346 tonic::Status::internal(format!("Could not parse {name} from .properties: {e}"))
347 })
348}
349
350pub async fn serve<G: SwhOptFullGraph + Sync + Send + 'static>(
351 graph: G,
352 bind_addr: std::net::SocketAddr,
353 statsd_client: cadence::StatsdClient,
354) -> Result<(), tonic::transport::Error> {
355 let statsd_client = Arc::new(statsd_client);
356 let graph = Arc::new(graph);
357
358 let (health_reporter, health_service) = tonic_health::server::health_reporter();
359 health_reporter
360 .set_serving::<TraversalServiceServer<TraversalService<Arc<G>>>>()
361 .await;
362
363 #[cfg(not(feature = "sentry"))]
364 let mut builder = Server::builder();
365 #[cfg(feature = "sentry")]
366 let mut builder =
367 Server::builder().layer(::sentry::integrations::tower::NewSentryLayer::new_from_top());
368 builder
369 .add_service(MiddlewareFor::new(
370 TraversalServiceServer::new(TraversalService::new(graph, Some(statsd_client.clone()))),
371 metrics::MetricsMiddleware::new(statsd_client),
372 ))
373 .add_service(health_service)
374 .add_service(
375 tonic_reflection::server::Builder::configure()
376 .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
377 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
378 .build_v1()
379 .expect("Could not load v1 reflection service"),
380 )
381 .add_service(
382 tonic_reflection::server::Builder::configure()
383 .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
384 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
385 .build_v1alpha()
386 .expect("Could not load v1alpha reflection service"),
387 )
388 .serve(bind_addr)
389 .await?;
390
391 Ok(())
392}