1use crate::hook::GeneralOutput;
2use crate::mlmd::artifact::{Artifact, ArtifactOrderByField};
3use crate::time::DateTime;
4use crate::web::{response, Config};
5use actix_web::{get, web, HttpResponse};
6use std::collections::{HashMap, HashSet};
7use std::time::Duration;
8
9#[get("/artifacts/{id}/contents/{name}")]
10async fn get_artifact_content(
11 config: web::Data<Config>,
12 path: web::Path<(i32, String)>,
13) -> actix_web::Result<HttpResponse> {
14 let (id, content_name) = path.into_inner();
15
16 let mut store = config.connect_metadata_store().await?;
17
18 let artifacts = store
19 .get_artifacts()
20 .id(mlmd::metadata::ArtifactId::new(id))
21 .execute()
22 .await
23 .map_err(actix_web::error::ErrorInternalServerError)?;
24 if artifacts.is_empty() {
25 return Err(actix_web::error::ErrorNotFound(format!(
26 "no such artifact: {}",
27 id
28 )));
29 }
30
31 let types = store
32 .get_artifact_types()
33 .id(artifacts[0].type_id)
34 .execute()
35 .await
36 .map_err(actix_web::error::ErrorInternalServerError)?;
37 if artifacts.is_empty() {
38 return Err(actix_web::error::ErrorInternalServerError(format!(
39 "no such artifact type: {}",
40 artifacts[0].type_id.get(),
41 )));
42 }
43 let artifact = Artifact::from((types[0].clone(), artifacts[0].clone()));
44
45 let output = config
46 .hook_runner
47 .run_artifact_content_hook(artifact, &content_name)
48 .await?;
49
50 match output {
51 GeneralOutput::Json(x) => Ok(response::json(&x)),
52 GeneralOutput::Markdown(x) => Ok(response::markdown(&x)),
53 GeneralOutput::Html(x) => Ok(response::html(&x)),
54 GeneralOutput::Redirect(x) => Ok(response::redirect(&x)),
55 }
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59#[serde(rename_all = "kebab-case")]
60pub struct GetArtifactsQuery {
61 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
62 pub type_name: Option<String>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub name: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub context: Option<i32>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub limit: Option<usize>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub offset: Option<usize>,
71 #[serde(default)]
72 pub order_by: ArtifactOrderByField,
73 #[serde(default)]
74 pub asc: bool,
75 #[serde(default, skip_serializing_if = "Option::is_none")]
76 pub mtime_start: Option<DateTime>,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub mtime_end: Option<DateTime>,
79}
80
81impl GetArtifactsQuery {
82 pub async fn get_artifacts(
84 &self,
85 store: &mut mlmd::MetadataStore,
86 ) -> anyhow::Result<Vec<mlmd::metadata::Artifact>> {
87 let context_id = if let Some(context) = self.context {
88 Some(mlmd::metadata::ContextId::new(context))
89 } else {
90 None
91 };
92
93 let mut request = store.get_artifacts().limit(self.limit.unwrap_or(100));
94 if let Some(c) = context_id {
95 request = request.context(c)
96 }
97 if let Some(n) = self.offset {
98 request = request.offset(n);
99 }
100 if let Some(n) = &self.type_name {
101 if let Some(m) = &self.name {
102 request = request.type_and_name(n, m);
103 } else {
104 request = request.ty(n);
105 }
106 }
107 request = request.order_by(self.order_by.into(), self.asc);
108
109 match (self.mtime_start, self.mtime_end) {
110 (None, None) => {}
111 (Some(start), None) => {
112 request =
113 request.update_time(Duration::from_millis(start.timestamp_millis() as u64)..);
114 }
115 (None, Some(end)) => {
116 request =
117 request.update_time(..Duration::from_millis(end.timestamp_millis() as u64));
118 }
119 (Some(start), Some(end)) => {
120 request = request.update_time(
121 Duration::from_millis(start.timestamp_millis() as u64)
122 ..Duration::from_millis(end.timestamp_millis() as u64),
123 );
124 }
125 }
126
127 Ok(request.execute().await?)
128 }
129
130 async fn get_artifact_types(
131 &self,
132 store: &mut mlmd::MetadataStore,
133 artifacts: &[mlmd::metadata::Artifact],
134 ) -> anyhow::Result<HashMap<mlmd::metadata::TypeId, mlmd::metadata::ArtifactType>> {
135 let artifact_type_ids = artifacts.iter().map(|x| x.type_id).collect::<HashSet<_>>();
136 Ok(store
137 .get_artifact_types()
138 .ids(artifact_type_ids.into_iter())
139 .execute()
140 .await?
141 .into_iter()
142 .map(|x| (x.id, x))
143 .collect())
144 }
145
146 fn prev(&self) -> Self {
147 let mut this = self.clone();
148 this.offset = Some(
149 self.offset
150 .unwrap_or(0)
151 .saturating_sub(self.limit.unwrap_or(100)),
152 );
153 this
154 }
155
156 fn next(&self) -> Self {
157 let mut this = self.clone();
158 this.offset = Some(self.offset() + self.limit());
159 this
160 }
161
162 fn reset_mtime_start(&self) -> Self {
163 let mut this = self.clone();
164 this.mtime_start = None;
165 this
166 }
167
168 fn reset_mtime_end(&self) -> Self {
169 let mut this = self.clone();
170 this.mtime_end = None;
171 this
172 }
173
174 fn filter_type(&self, type_name: &str) -> Self {
175 let mut this = self.clone();
176 this.type_name = Some(type_name.to_owned());
177 this.offset = None;
178 this
179 }
180
181 fn order_by(&self, field: ArtifactOrderByField, asc: bool) -> Self {
182 let mut this = self.clone();
183 this.order_by = field;
184 this.asc = asc;
185 this.offset = None;
186 this
187 }
188
189 pub fn to_url(&self) -> String {
190 format!("/artifacts/?{}", self.to_qs())
191 }
192
193 pub fn to_qs(&self) -> String {
194 let qs = serde_json::to_value(self)
195 .expect("unreachable")
196 .as_object()
197 .expect("unwrap")
198 .into_iter()
199 .map(|(k, v)| {
200 format!(
201 "{}={}",
202 k,
203 v.to_string().trim_matches('"').replace('+', "%2B") )
205 })
206 .collect::<Vec<_>>();
207 qs.join("&")
208 }
209
210 fn offset(&self) -> usize {
211 self.offset.unwrap_or(0)
212 }
213
214 fn limit(&self) -> usize {
215 self.limit.unwrap_or(100)
216 }
217}
218
219#[get("/artifacts/")]
220pub async fn get_artifacts(
221 config: web::Data<Config>,
222 query: web::Query<GetArtifactsQuery>,
223) -> actix_web::Result<HttpResponse> {
224 let mut store = config.connect_metadata_store().await?;
225
226 let artifacts = query
227 .get_artifacts(&mut store)
228 .await
229 .map_err(actix_web::error::ErrorInternalServerError)?;
230 let artifact_types = query
231 .get_artifact_types(&mut store, &artifacts)
232 .await
233 .map_err(actix_web::error::ErrorInternalServerError)?;
234
235 let mut md = "# Artifacts\n".to_string();
236
237 let mut pager_md = String::new();
238 if query.offset() != 0 {
239 pager_md += &format!(" [<<]({})", query.prev().to_url());
240 } else {
241 pager_md += " <<";
242 }
243 pager_md += &format!(
244 " {}~{} ",
245 query.offset() + 1,
246 query.offset() + artifacts.len()
247 );
248 if artifacts.len() == query.limit() {
249 pager_md += &format!("[>>]({})", query.next().to_url());
250 } else {
251 pager_md += ">>";
252 }
253 md += &pager_md;
254
255 md += &format!(
256 r#",
257Update Time: <input type="date" id="start_date" {} onchange="filter_start_date()"> ~
258 <input type="date" id="end_date" {} onchange="filter_end_date()">
259
260<script type="text/javascript">
261function filter_start_date() {{
262 var v = document.getElementById("start_date").value;
263 location.href = "{}&mtime-start=" + v + "T00:00:00%2B09:00";
264}}
265</script>
266<script type="text/javascript">
267function filter_end_date() {{
268 var v = document.getElementById("end_date").value;
269 location.href = "{}&mtime-end=" + v + "T00:00:00%2B09:00";
270}}
271</script>
272"#,
273 if let Some(v) = &query.mtime_start {
274 format!("value={:?}", v.format("%Y-%m-%d").to_string())
275 } else {
276 "".to_owned()
277 },
278 if let Some(v) = &query.mtime_end {
279 format!("value={:?}", v.format("%Y-%m-%d").to_string())
280 } else {
281 "".to_owned()
282 },
283 query.reset_mtime_start().to_url(),
284 query.reset_mtime_end().to_url()
285 );
286
287 md += "\n";
288 md += &format!(
289 "| id{}{} | type | name{}{} | state | update-time{}{} | summary |\n",
290 if query.order_by == ArtifactOrderByField::Id && query.asc {
291 format!("<")
292 } else {
293 format!(
294 "[<]({})",
295 query.order_by(ArtifactOrderByField::Id, true).to_url()
296 )
297 },
298 if query.order_by == ArtifactOrderByField::Id && !query.asc {
299 format!(">")
300 } else {
301 format!(
302 "[>]({})",
303 query.order_by(ArtifactOrderByField::Id, false).to_url()
304 )
305 },
306 if query.order_by == ArtifactOrderByField::Name && query.asc {
307 format!("<")
308 } else {
309 format!(
310 "[<]({})",
311 query.order_by(ArtifactOrderByField::Name, true).to_url()
312 )
313 },
314 if query.order_by == ArtifactOrderByField::Name && !query.asc {
315 format!(">")
316 } else {
317 format!(
318 "[>]({})",
319 query.order_by(ArtifactOrderByField::Name, false).to_url()
320 )
321 },
322 if query.order_by == ArtifactOrderByField::UpdateTime && query.asc {
323 format!("<")
324 } else {
325 format!(
326 "[<]({})",
327 query
328 .order_by(ArtifactOrderByField::UpdateTime, true)
329 .to_url()
330 )
331 },
332 if query.order_by == ArtifactOrderByField::UpdateTime && !query.asc {
333 format!(">")
334 } else {
335 format!(
336 "[>]({})",
337 query
338 .order_by(ArtifactOrderByField::UpdateTime, false)
339 .to_url()
340 )
341 }
342 );
343 md += "|------|------|--------|-------|-------|--------|\n";
344
345 let artifacts = artifacts
346 .into_iter()
347 .map(|a| Artifact::from((artifact_types[&a.type_id].clone(), a)))
348 .collect();
349 let artifacts = config
350 .hook_runner
351 .run_artifact_summary_hook(artifacts)
352 .await?;
353 for a in artifacts {
354 md += &format!(
355 "| [{}]({}) | [{}]({}) | {} | {} | {} | {} |\n",
356 a.id,
357 format!("/artifacts/{}", a.id),
358 a.type_name,
359 query.filter_type(&a.type_name).to_url(),
360 a.name.as_ref().map_or("", |x| x.as_str()),
361 a.state,
362 a.mtime,
363 a.summary.as_ref().map_or("", |x| x.as_str())
364 );
365 }
366
367 md += "\n";
368 md += &pager_md;
369
370 Ok(response::markdown(&md))
371}
372
373#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
374#[serde(rename_all = "kebab-case")]
375pub struct GetArtifactQuery {
376 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
377 pub type_name: Option<String>,
378}
379
380#[get("/artifacts/{id}")]
381pub async fn get_artifact(
382 config: web::Data<Config>,
383 path: web::Path<(String,)>,
384 query: web::Query<GetArtifactQuery>,
385) -> actix_web::Result<HttpResponse> {
386 let id_or_name = &path.0;
387 let mut store = config.connect_metadata_store().await?;
388
389 let artifacts = match id_or_name.parse::<i32>().ok() {
390 Some(id) => store
391 .get_artifacts()
392 .id(mlmd::metadata::ArtifactId::new(id))
393 .execute()
394 .await
395 .map_err(actix_web::error::ErrorInternalServerError)?,
396 None => {
397 let name = id_or_name;
398 if let Some(type_name) = &query.type_name {
399 store
400 .get_artifacts()
401 .type_and_name(type_name, name)
402 .execute()
403 .await
404 .map_err(actix_web::error::ErrorInternalServerError)?
405 } else {
406 return Err(actix_web::error::ErrorBadRequest(format!(
407 "`type` query parameter must be specified"
408 )));
409 }
410 }
411 };
412 if artifacts.is_empty() {
413 return Err(actix_web::error::ErrorNotFound(format!(
414 "no such artifact: {:?}",
415 id_or_name
416 )));
417 }
418
419 let types = store
420 .get_artifact_types()
421 .id(artifacts[0].type_id)
422 .execute()
423 .await
424 .map_err(actix_web::error::ErrorInternalServerError)?;
425 if artifacts.is_empty() {
426 return Err(actix_web::error::ErrorInternalServerError(format!(
427 "no such artifact type: {}",
428 artifacts[0].type_id.get(),
429 )));
430 }
431
432 let artifact = Artifact::from((types[0].clone(), artifacts[0].clone()));
433 let artifact = config
434 .hook_runner
435 .run_artifact_detail_hook(artifact)
436 .await?;
437
438 let mut md = "# Artifact\n".to_string();
439
440 md += &format!("- **ID**: {}\n", artifact.id);
441 md += &format!(
442 "- **Type**: [{}](/artifact_types/{})\n",
443 artifact.type_name,
444 types[0].id.get()
445 );
446 if let Some(x) = &artifact.name {
447 md += &format!("- **Name**: {}\n", x);
448 }
449 if let Some(x) = &artifact.uri {
450 md += &format!("- **URI**: {}\n", x);
451 }
452 md += &format!("- **State**: {}\n", artifact.state);
453 md += &format!("- **Create Time**: {}\n", artifact.ctime);
454 md += &format!("- **Update Time**: {}\n", artifact.mtime);
455
456 if !artifact.properties.is_empty() {
457 md += &format!("- **Properties**:\n");
458 for (k, v) in &artifact.properties {
459 md += &format!(" - **{}**: {}\n", k, v);
460 }
461 }
462 if !artifact.custom_properties.is_empty() {
463 md += &format!("- **Custom Properties**:\n");
464 for (k, v) in &artifact.custom_properties {
465 md += &format!(" - **{}**: {}\n", k, v);
466 }
467 }
468 if !artifact.extra_properties.is_empty() {
469 md += &format!("- **Extra Properties**:\n");
470 for (k, v) in &artifact.extra_properties {
471 md += &format!(" - **{}**: {}\n", k, v);
472 }
473 }
474
475 let contexts_len = store
476 .get_contexts()
477 .artifact(mlmd::metadata::ArtifactId::new(artifact.id))
478 .count()
479 .await
480 .map_err(actix_web::error::ErrorInternalServerError)?;
481
482 let events_len = store
483 .get_events()
484 .artifact(mlmd::metadata::ArtifactId::new(artifact.id))
485 .count()
486 .await
487 .map_err(actix_web::error::ErrorInternalServerError)?;
488 if contexts_len > 0 {
489 md += &format!(
490 "- [**Contexts**](/contexts/?artifact={}) ({})\n",
491 artifact.id, contexts_len
492 );
493 }
494 if events_len > 0 {
495 md += &format!(
496 "- [**Events**](/events/?artifact={}) ({})\n",
497 artifact.id, events_len
498 );
499 }
500
501 md += &format!("- [**Graph**](/artifacts/{}/graph)\n", artifact.id);
502
503 Ok(response::markdown(&md))
504}
505
506#[get("/artifacts/{id}/graph")]
507pub async fn get_artifact_graph(
508 config: web::Data<Config>,
509 path: web::Path<(i32,)>,
510) -> actix_web::Result<HttpResponse> {
511 let id = path.0;
512 let mut store = config.connect_metadata_store().await?;
513
514 let graph = Graph::new(&mut store, id)
515 .await
516 .map_err(actix_web::error::ErrorInternalServerError)?;
517
518 let svg = std::process::Command::new("dot")
519 .arg("-Tsvg")
520 .stdin(std::process::Stdio::piped())
521 .stdout(std::process::Stdio::piped())
522 .spawn()
523 .ok()
524 .and_then(|mut child| {
525 use std::io::Write;
526
527 let writer = child.stdin.as_mut()?;
528 graph.render(writer).ok()?;
529 writer.flush().ok()?;
530 let output = child.wait_with_output().ok()?;
531 if !output.status.success() {
532 None
533 } else {
534 Some(output.stdout)
535 }
536 });
537
538 if let Some(svg) = svg {
539 Ok(response::svg(&String::from_utf8(svg).expect("TODO")))
540 } else {
541 let mut buf = Vec::new();
542 graph.render(&mut buf).expect("TODO");
543 Ok(response::markdown(&String::from_utf8(buf).expect("TODO")))
544 }
545}
546
547use crate::web::handlers::executions::{Edge, Node, NodeId};
548
549#[derive(Debug)]
550struct Graph {
551 nodes: Vec<Node>,
552 edges: Vec<Edge>,
553}
554
555impl Graph {
556 async fn new(store: &mut mlmd::MetadataStore, artifact_id: i32) -> anyhow::Result<Self> {
557 let mut nodes = HashMap::new();
558 let mut edges = Vec::new();
559 let mut stack = vec![NodeId::Artifact(artifact_id)];
560 while let Some(curr) = stack.pop() {
561 if nodes.contains_key(&curr) {
562 continue;
563 }
564 let mut curr = match curr {
565 NodeId::Execution(id) => Node::Execution {
566 node: fetch_execution(store, id).await?,
567 inputs: 0,
568 outputs: 0,
569 },
570 NodeId::Artifact(id) => Node::Artifact {
571 node: fetch_artifact(store, id).await?,
572 inputs: 0,
573 outputs: 0,
574 },
575 };
576
577 let events = match &curr {
578 Node::Execution { node, .. } => {
579 store
580 .get_events()
581 .execution(mlmd::metadata::ExecutionId::new(node.id))
582 .execute()
583 .await?
584 }
585 Node::Artifact { node, .. } => {
586 store
587 .get_events()
588 .artifact(mlmd::metadata::ArtifactId::new(node.id))
589 .execute()
590 .await?
591 }
592 };
593 curr.set_in_out(&events);
594 nodes.insert(curr.id(), curr.clone());
595 anyhow::ensure!(
596 nodes.len() < 100,
597 "Too many executions and artifact to visualize"
598 );
599
600 for event in events {
601 if matches!(curr, Node::Artifact { .. }) {
602 use mlmd::metadata::EventType::*;
603 if event.artifact_id.get() == artifact_id
604 || matches!(event.ty, Output | DeclaredOutput | InternalOutput)
605 {
606 let id = NodeId::Execution(event.execution_id.get());
607 stack.push(id);
608 if matches!(event.ty, Output | DeclaredOutput | InternalOutput) {
609 edges.push(Edge {
610 source: id,
611 target: curr.id(),
612 event: event.into(),
613 });
614 }
615 }
616 } else {
617 use mlmd::metadata::EventType::*;
618 if matches!(event.ty, Input | DeclaredInput | InternalInput) {
619 let id = NodeId::Artifact(event.artifact_id.get());
620 stack.push(id);
621 edges.push(Edge {
622 source: id,
623 target: curr.id(),
624 event: event.into(),
625 });
626 }
627 }
628 }
629 }
630
631 Ok(Self {
632 nodes: nodes.into_iter().map(|x| x.1).collect(),
633 edges,
634 })
635 }
636
637 fn render<W: std::io::Write>(&self, writer: &mut W) -> anyhow::Result<()> {
638 writeln!(writer, "digraph execution_graph {{")?;
639
640 for node in &self.nodes {
641 writeln!(writer, "{}[{}]", node.id(), node.attrs().join(","))?;
642 }
643
644 for edge in &self.edges {
645 writeln!(
646 writer,
647 "{} -> {} [label={:?}];",
648 edge.source,
649 edge.target,
650 format!("{:?}:{:?}", edge.event.ty, edge.event.path)
651 )?;
652 }
653
654 writeln!(writer, "}}")?;
655 Ok(())
656 }
657}
658
659async fn fetch_execution(
660 store: &mut mlmd::MetadataStore,
661 id: i32,
662) -> anyhow::Result<crate::mlmd::execution::Execution> {
663 let executions = store
664 .get_executions()
665 .id(mlmd::metadata::ExecutionId::new(id))
666 .execute()
667 .await?;
668 anyhow::ensure!(!executions.is_empty(), "no such execution: {}", id);
669
670 let types = store
671 .get_execution_types()
672 .id(executions[0].type_id)
673 .execute()
674 .await?;
675 anyhow::ensure!(
676 !executions.is_empty(),
677 "no such execution tyep: {}",
678 executions[0].type_id.get()
679 );
680
681 Ok(crate::mlmd::execution::Execution::from((
682 types[0].clone(),
683 executions[0].clone(),
684 )))
685}
686
687async fn fetch_artifact(
688 store: &mut mlmd::MetadataStore,
689 id: i32,
690) -> anyhow::Result<crate::mlmd::artifact::Artifact> {
691 let artifacts = store
692 .get_artifacts()
693 .id(mlmd::metadata::ArtifactId::new(id))
694 .execute()
695 .await?;
696 anyhow::ensure!(!artifacts.is_empty(), "no such artifact: {}", id);
697
698 let types = store
699 .get_artifact_types()
700 .id(artifacts[0].type_id)
701 .execute()
702 .await?;
703 anyhow::ensure!(
704 !artifacts.is_empty(),
705 "no such artifact tyep: {}",
706 artifacts[0].type_id.get()
707 );
708
709 Ok(crate::mlmd::artifact::Artifact::from((
710 types[0].clone(),
711 artifacts[0].clone(),
712 )))
713}