weih/web/handlers/
artifacts.rs

1use crate::hook::GeneralOutput;
2use crate::mlmd::artifact::{Artifact, ArtifactOrderByField};
3use crate::time::DateTime;
4use crate::web::{response, Config};
5use actix_web::{get, web, HttpResponse};
6use std::collections::{HashMap, HashSet};
7use std::time::Duration;
8
9#[get("/artifacts/{id}/contents/{name}")]
10async fn get_artifact_content(
11    config: web::Data<Config>,
12    path: web::Path<(i32, String)>,
13) -> actix_web::Result<HttpResponse> {
14    let (id, content_name) = path.into_inner();
15
16    let mut store = config.connect_metadata_store().await?;
17
18    let artifacts = store
19        .get_artifacts()
20        .id(mlmd::metadata::ArtifactId::new(id))
21        .execute()
22        .await
23        .map_err(actix_web::error::ErrorInternalServerError)?;
24    if artifacts.is_empty() {
25        return Err(actix_web::error::ErrorNotFound(format!(
26            "no such artifact: {}",
27            id
28        )));
29    }
30
31    let types = store
32        .get_artifact_types()
33        .id(artifacts[0].type_id)
34        .execute()
35        .await
36        .map_err(actix_web::error::ErrorInternalServerError)?;
37    if artifacts.is_empty() {
38        return Err(actix_web::error::ErrorInternalServerError(format!(
39            "no such artifact type: {}",
40            artifacts[0].type_id.get(),
41        )));
42    }
43    let artifact = Artifact::from((types[0].clone(), artifacts[0].clone()));
44
45    let output = config
46        .hook_runner
47        .run_artifact_content_hook(artifact, &content_name)
48        .await?;
49
50    match output {
51        GeneralOutput::Json(x) => Ok(response::json(&x)),
52        GeneralOutput::Markdown(x) => Ok(response::markdown(&x)),
53        GeneralOutput::Html(x) => Ok(response::html(&x)),
54        GeneralOutput::Redirect(x) => Ok(response::redirect(&x)),
55    }
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59#[serde(rename_all = "kebab-case")]
60pub struct GetArtifactsQuery {
61    #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
62    pub type_name: Option<String>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub name: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub context: Option<i32>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub limit: Option<usize>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub offset: Option<usize>,
71    #[serde(default)]
72    pub order_by: ArtifactOrderByField,
73    #[serde(default)]
74    pub asc: bool,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub mtime_start: Option<DateTime>,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub mtime_end: Option<DateTime>,
79}
80
81impl GetArtifactsQuery {
82    // TODO
83    pub async fn get_artifacts(
84        &self,
85        store: &mut mlmd::MetadataStore,
86    ) -> anyhow::Result<Vec<mlmd::metadata::Artifact>> {
87        let context_id = if let Some(context) = self.context {
88            Some(mlmd::metadata::ContextId::new(context))
89        } else {
90            None
91        };
92
93        let mut request = store.get_artifacts().limit(self.limit.unwrap_or(100));
94        if let Some(c) = context_id {
95            request = request.context(c)
96        }
97        if let Some(n) = self.offset {
98            request = request.offset(n);
99        }
100        if let Some(n) = &self.type_name {
101            if let Some(m) = &self.name {
102                request = request.type_and_name(n, m);
103            } else {
104                request = request.ty(n);
105            }
106        }
107        request = request.order_by(self.order_by.into(), self.asc);
108
109        match (self.mtime_start, self.mtime_end) {
110            (None, None) => {}
111            (Some(start), None) => {
112                request =
113                    request.update_time(Duration::from_millis(start.timestamp_millis() as u64)..);
114            }
115            (None, Some(end)) => {
116                request =
117                    request.update_time(..Duration::from_millis(end.timestamp_millis() as u64));
118            }
119            (Some(start), Some(end)) => {
120                request = request.update_time(
121                    Duration::from_millis(start.timestamp_millis() as u64)
122                        ..Duration::from_millis(end.timestamp_millis() as u64),
123                );
124            }
125        }
126
127        Ok(request.execute().await?)
128    }
129
130    async fn get_artifact_types(
131        &self,
132        store: &mut mlmd::MetadataStore,
133        artifacts: &[mlmd::metadata::Artifact],
134    ) -> anyhow::Result<HashMap<mlmd::metadata::TypeId, mlmd::metadata::ArtifactType>> {
135        let artifact_type_ids = artifacts.iter().map(|x| x.type_id).collect::<HashSet<_>>();
136        Ok(store
137            .get_artifact_types()
138            .ids(artifact_type_ids.into_iter())
139            .execute()
140            .await?
141            .into_iter()
142            .map(|x| (x.id, x))
143            .collect())
144    }
145
146    fn prev(&self) -> Self {
147        let mut this = self.clone();
148        this.offset = Some(
149            self.offset
150                .unwrap_or(0)
151                .saturating_sub(self.limit.unwrap_or(100)),
152        );
153        this
154    }
155
156    fn next(&self) -> Self {
157        let mut this = self.clone();
158        this.offset = Some(self.offset() + self.limit());
159        this
160    }
161
162    fn reset_mtime_start(&self) -> Self {
163        let mut this = self.clone();
164        this.mtime_start = None;
165        this
166    }
167
168    fn reset_mtime_end(&self) -> Self {
169        let mut this = self.clone();
170        this.mtime_end = None;
171        this
172    }
173
174    fn filter_type(&self, type_name: &str) -> Self {
175        let mut this = self.clone();
176        this.type_name = Some(type_name.to_owned());
177        this.offset = None;
178        this
179    }
180
181    fn order_by(&self, field: ArtifactOrderByField, asc: bool) -> Self {
182        let mut this = self.clone();
183        this.order_by = field;
184        this.asc = asc;
185        this.offset = None;
186        this
187    }
188
189    pub fn to_url(&self) -> String {
190        format!("/artifacts/?{}", self.to_qs())
191    }
192
193    pub fn to_qs(&self) -> String {
194        let qs = serde_json::to_value(self)
195            .expect("unreachable")
196            .as_object()
197            .expect("unwrap")
198            .into_iter()
199            .map(|(k, v)| {
200                format!(
201                    "{}={}",
202                    k,
203                    v.to_string().trim_matches('"').replace('+', "%2B") // TODO: escape
204                )
205            })
206            .collect::<Vec<_>>();
207        qs.join("&")
208    }
209
210    fn offset(&self) -> usize {
211        self.offset.unwrap_or(0)
212    }
213
214    fn limit(&self) -> usize {
215        self.limit.unwrap_or(100)
216    }
217}
218
219#[get("/artifacts/")]
220pub async fn get_artifacts(
221    config: web::Data<Config>,
222    query: web::Query<GetArtifactsQuery>,
223) -> actix_web::Result<HttpResponse> {
224    let mut store = config.connect_metadata_store().await?;
225
226    let artifacts = query
227        .get_artifacts(&mut store)
228        .await
229        .map_err(actix_web::error::ErrorInternalServerError)?;
230    let artifact_types = query
231        .get_artifact_types(&mut store, &artifacts)
232        .await
233        .map_err(actix_web::error::ErrorInternalServerError)?;
234
235    let mut md = "# Artifacts\n".to_string();
236
237    let mut pager_md = String::new();
238    if query.offset() != 0 {
239        pager_md += &format!(" [<<]({})", query.prev().to_url());
240    } else {
241        pager_md += " <<";
242    }
243    pager_md += &format!(
244        " {}~{} ",
245        query.offset() + 1,
246        query.offset() + artifacts.len()
247    );
248    if artifacts.len() == query.limit() {
249        pager_md += &format!("[>>]({})", query.next().to_url());
250    } else {
251        pager_md += ">>";
252    }
253    md += &pager_md;
254
255    md += &format!(
256        r#",
257Update Time: <input type="date" id="start_date" {} onchange="filter_start_date()"> ~
258             <input type="date" id="end_date" {} onchange="filter_end_date()">
259
260<script type="text/javascript">
261function filter_start_date() {{
262  var v = document.getElementById("start_date").value;
263  location.href = "{}&mtime-start=" + v + "T00:00:00%2B09:00";
264}}
265</script>
266<script type="text/javascript">
267function filter_end_date() {{
268  var v = document.getElementById("end_date").value;
269  location.href = "{}&mtime-end=" + v + "T00:00:00%2B09:00";
270}}
271</script>
272"#,
273        if let Some(v) = &query.mtime_start {
274            format!("value={:?}", v.format("%Y-%m-%d").to_string())
275        } else {
276            "".to_owned()
277        },
278        if let Some(v) = &query.mtime_end {
279            format!("value={:?}", v.format("%Y-%m-%d").to_string())
280        } else {
281            "".to_owned()
282        },
283        query.reset_mtime_start().to_url(),
284        query.reset_mtime_end().to_url()
285    );
286
287    md += "\n";
288    md += &format!(
289        "| id{}{} | type | name{}{} | state | update-time{}{} | summary |\n",
290        if query.order_by == ArtifactOrderByField::Id && query.asc {
291            format!("<")
292        } else {
293            format!(
294                "[<]({})",
295                query.order_by(ArtifactOrderByField::Id, true).to_url()
296            )
297        },
298        if query.order_by == ArtifactOrderByField::Id && !query.asc {
299            format!(">")
300        } else {
301            format!(
302                "[>]({})",
303                query.order_by(ArtifactOrderByField::Id, false).to_url()
304            )
305        },
306        if query.order_by == ArtifactOrderByField::Name && query.asc {
307            format!("<")
308        } else {
309            format!(
310                "[<]({})",
311                query.order_by(ArtifactOrderByField::Name, true).to_url()
312            )
313        },
314        if query.order_by == ArtifactOrderByField::Name && !query.asc {
315            format!(">")
316        } else {
317            format!(
318                "[>]({})",
319                query.order_by(ArtifactOrderByField::Name, false).to_url()
320            )
321        },
322        if query.order_by == ArtifactOrderByField::UpdateTime && query.asc {
323            format!("<")
324        } else {
325            format!(
326                "[<]({})",
327                query
328                    .order_by(ArtifactOrderByField::UpdateTime, true)
329                    .to_url()
330            )
331        },
332        if query.order_by == ArtifactOrderByField::UpdateTime && !query.asc {
333            format!(">")
334        } else {
335            format!(
336                "[>]({})",
337                query
338                    .order_by(ArtifactOrderByField::UpdateTime, false)
339                    .to_url()
340            )
341        }
342    );
343    md += "|------|------|--------|-------|-------|--------|\n";
344
345    let artifacts = artifacts
346        .into_iter()
347        .map(|a| Artifact::from((artifact_types[&a.type_id].clone(), a)))
348        .collect();
349    let artifacts = config
350        .hook_runner
351        .run_artifact_summary_hook(artifacts)
352        .await?;
353    for a in artifacts {
354        md += &format!(
355            "| [{}]({}) | [{}]({}) | {} | {} | {} | {} |\n",
356            a.id,
357            format!("/artifacts/{}", a.id),
358            a.type_name,
359            query.filter_type(&a.type_name).to_url(),
360            a.name.as_ref().map_or("", |x| x.as_str()),
361            a.state,
362            a.mtime,
363            a.summary.as_ref().map_or("", |x| x.as_str())
364        );
365    }
366
367    md += "\n";
368    md += &pager_md;
369
370    Ok(response::markdown(&md))
371}
372
373#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
374#[serde(rename_all = "kebab-case")]
375pub struct GetArtifactQuery {
376    #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
377    pub type_name: Option<String>,
378}
379
380#[get("/artifacts/{id}")]
381pub async fn get_artifact(
382    config: web::Data<Config>,
383    path: web::Path<(String,)>,
384    query: web::Query<GetArtifactQuery>,
385) -> actix_web::Result<HttpResponse> {
386    let id_or_name = &path.0;
387    let mut store = config.connect_metadata_store().await?;
388
389    let artifacts = match id_or_name.parse::<i32>().ok() {
390        Some(id) => store
391            .get_artifacts()
392            .id(mlmd::metadata::ArtifactId::new(id))
393            .execute()
394            .await
395            .map_err(actix_web::error::ErrorInternalServerError)?,
396        None => {
397            let name = id_or_name;
398            if let Some(type_name) = &query.type_name {
399                store
400                    .get_artifacts()
401                    .type_and_name(type_name, name)
402                    .execute()
403                    .await
404                    .map_err(actix_web::error::ErrorInternalServerError)?
405            } else {
406                return Err(actix_web::error::ErrorBadRequest(format!(
407                    "`type` query parameter must be specified"
408                )));
409            }
410        }
411    };
412    if artifacts.is_empty() {
413        return Err(actix_web::error::ErrorNotFound(format!(
414            "no such artifact: {:?}",
415            id_or_name
416        )));
417    }
418
419    let types = store
420        .get_artifact_types()
421        .id(artifacts[0].type_id)
422        .execute()
423        .await
424        .map_err(actix_web::error::ErrorInternalServerError)?;
425    if artifacts.is_empty() {
426        return Err(actix_web::error::ErrorInternalServerError(format!(
427            "no such artifact type: {}",
428            artifacts[0].type_id.get(),
429        )));
430    }
431
432    let artifact = Artifact::from((types[0].clone(), artifacts[0].clone()));
433    let artifact = config
434        .hook_runner
435        .run_artifact_detail_hook(artifact)
436        .await?;
437
438    let mut md = "# Artifact\n".to_string();
439
440    md += &format!("- **ID**: {}\n", artifact.id);
441    md += &format!(
442        "- **Type**: [{}](/artifact_types/{})\n",
443        artifact.type_name,
444        types[0].id.get()
445    );
446    if let Some(x) = &artifact.name {
447        md += &format!("- **Name**: {}\n", x);
448    }
449    if let Some(x) = &artifact.uri {
450        md += &format!("- **URI**: {}\n", x);
451    }
452    md += &format!("- **State**: {}\n", artifact.state);
453    md += &format!("- **Create Time**: {}\n", artifact.ctime);
454    md += &format!("- **Update Time**: {}\n", artifact.mtime);
455
456    if !artifact.properties.is_empty() {
457        md += &format!("- **Properties**:\n");
458        for (k, v) in &artifact.properties {
459            md += &format!("  - **{}**: {}\n", k, v);
460        }
461    }
462    if !artifact.custom_properties.is_empty() {
463        md += &format!("- **Custom Properties**:\n");
464        for (k, v) in &artifact.custom_properties {
465            md += &format!("  - **{}**: {}\n", k, v);
466        }
467    }
468    if !artifact.extra_properties.is_empty() {
469        md += &format!("- **Extra Properties**:\n");
470        for (k, v) in &artifact.extra_properties {
471            md += &format!("  - **{}**: {}\n", k, v);
472        }
473    }
474
475    let contexts_len = store
476        .get_contexts()
477        .artifact(mlmd::metadata::ArtifactId::new(artifact.id))
478        .count()
479        .await
480        .map_err(actix_web::error::ErrorInternalServerError)?;
481
482    let events_len = store
483        .get_events()
484        .artifact(mlmd::metadata::ArtifactId::new(artifact.id))
485        .count()
486        .await
487        .map_err(actix_web::error::ErrorInternalServerError)?;
488    if contexts_len > 0 {
489        md += &format!(
490            "- [**Contexts**](/contexts/?artifact={}) ({})\n",
491            artifact.id, contexts_len
492        );
493    }
494    if events_len > 0 {
495        md += &format!(
496            "- [**Events**](/events/?artifact={}) ({})\n",
497            artifact.id, events_len
498        );
499    }
500
501    md += &format!("- [**Graph**](/artifacts/{}/graph)\n", artifact.id);
502
503    Ok(response::markdown(&md))
504}
505
506#[get("/artifacts/{id}/graph")]
507pub async fn get_artifact_graph(
508    config: web::Data<Config>,
509    path: web::Path<(i32,)>,
510) -> actix_web::Result<HttpResponse> {
511    let id = path.0;
512    let mut store = config.connect_metadata_store().await?;
513
514    let graph = Graph::new(&mut store, id)
515        .await
516        .map_err(actix_web::error::ErrorInternalServerError)?;
517
518    let svg = std::process::Command::new("dot")
519        .arg("-Tsvg")
520        .stdin(std::process::Stdio::piped())
521        .stdout(std::process::Stdio::piped())
522        .spawn()
523        .ok()
524        .and_then(|mut child| {
525            use std::io::Write;
526
527            let writer = child.stdin.as_mut()?;
528            graph.render(writer).ok()?;
529            writer.flush().ok()?;
530            let output = child.wait_with_output().ok()?;
531            if !output.status.success() {
532                None
533            } else {
534                Some(output.stdout)
535            }
536        });
537
538    if let Some(svg) = svg {
539        Ok(response::svg(&String::from_utf8(svg).expect("TODO")))
540    } else {
541        let mut buf = Vec::new();
542        graph.render(&mut buf).expect("TODO");
543        Ok(response::markdown(&String::from_utf8(buf).expect("TODO")))
544    }
545}
546
547use crate::web::handlers::executions::{Edge, Node, NodeId};
548
549#[derive(Debug)]
550struct Graph {
551    nodes: Vec<Node>,
552    edges: Vec<Edge>,
553}
554
555impl Graph {
556    async fn new(store: &mut mlmd::MetadataStore, artifact_id: i32) -> anyhow::Result<Self> {
557        let mut nodes = HashMap::new();
558        let mut edges = Vec::new();
559        let mut stack = vec![NodeId::Artifact(artifact_id)];
560        while let Some(curr) = stack.pop() {
561            if nodes.contains_key(&curr) {
562                continue;
563            }
564            let mut curr = match curr {
565                NodeId::Execution(id) => Node::Execution {
566                    node: fetch_execution(store, id).await?,
567                    inputs: 0,
568                    outputs: 0,
569                },
570                NodeId::Artifact(id) => Node::Artifact {
571                    node: fetch_artifact(store, id).await?,
572                    inputs: 0,
573                    outputs: 0,
574                },
575            };
576
577            let events = match &curr {
578                Node::Execution { node, .. } => {
579                    store
580                        .get_events()
581                        .execution(mlmd::metadata::ExecutionId::new(node.id))
582                        .execute()
583                        .await?
584                }
585                Node::Artifact { node, .. } => {
586                    store
587                        .get_events()
588                        .artifact(mlmd::metadata::ArtifactId::new(node.id))
589                        .execute()
590                        .await?
591                }
592            };
593            curr.set_in_out(&events);
594            nodes.insert(curr.id(), curr.clone());
595            anyhow::ensure!(
596                nodes.len() < 100,
597                "Too many executions and artifact to visualize"
598            );
599
600            for event in events {
601                if matches!(curr, Node::Artifact { .. }) {
602                    use mlmd::metadata::EventType::*;
603                    if event.artifact_id.get() == artifact_id
604                        || matches!(event.ty, Output | DeclaredOutput | InternalOutput)
605                    {
606                        let id = NodeId::Execution(event.execution_id.get());
607                        stack.push(id);
608                        if matches!(event.ty, Output | DeclaredOutput | InternalOutput) {
609                            edges.push(Edge {
610                                source: id,
611                                target: curr.id(),
612                                event: event.into(),
613                            });
614                        }
615                    }
616                } else {
617                    use mlmd::metadata::EventType::*;
618                    if matches!(event.ty, Input | DeclaredInput | InternalInput) {
619                        let id = NodeId::Artifact(event.artifact_id.get());
620                        stack.push(id);
621                        edges.push(Edge {
622                            source: id,
623                            target: curr.id(),
624                            event: event.into(),
625                        });
626                    }
627                }
628            }
629        }
630
631        Ok(Self {
632            nodes: nodes.into_iter().map(|x| x.1).collect(),
633            edges,
634        })
635    }
636
637    fn render<W: std::io::Write>(&self, writer: &mut W) -> anyhow::Result<()> {
638        writeln!(writer, "digraph execution_graph {{")?;
639
640        for node in &self.nodes {
641            writeln!(writer, "{}[{}]", node.id(), node.attrs().join(","))?;
642        }
643
644        for edge in &self.edges {
645            writeln!(
646                writer,
647                "{} -> {} [label={:?}];",
648                edge.source,
649                edge.target,
650                format!("{:?}:{:?}", edge.event.ty, edge.event.path)
651            )?;
652        }
653
654        writeln!(writer, "}}")?;
655        Ok(())
656    }
657}
658
659async fn fetch_execution(
660    store: &mut mlmd::MetadataStore,
661    id: i32,
662) -> anyhow::Result<crate::mlmd::execution::Execution> {
663    let executions = store
664        .get_executions()
665        .id(mlmd::metadata::ExecutionId::new(id))
666        .execute()
667        .await?;
668    anyhow::ensure!(!executions.is_empty(), "no such execution: {}", id);
669
670    let types = store
671        .get_execution_types()
672        .id(executions[0].type_id)
673        .execute()
674        .await?;
675    anyhow::ensure!(
676        !executions.is_empty(),
677        "no such execution tyep: {}",
678        executions[0].type_id.get()
679    );
680
681    Ok(crate::mlmd::execution::Execution::from((
682        types[0].clone(),
683        executions[0].clone(),
684    )))
685}
686
687async fn fetch_artifact(
688    store: &mut mlmd::MetadataStore,
689    id: i32,
690) -> anyhow::Result<crate::mlmd::artifact::Artifact> {
691    let artifacts = store
692        .get_artifacts()
693        .id(mlmd::metadata::ArtifactId::new(id))
694        .execute()
695        .await?;
696    anyhow::ensure!(!artifacts.is_empty(), "no such artifact: {}", id);
697
698    let types = store
699        .get_artifact_types()
700        .id(artifacts[0].type_id)
701        .execute()
702        .await?;
703    anyhow::ensure!(
704        !artifacts.is_empty(),
705        "no such artifact tyep: {}",
706        artifacts[0].type_id.get()
707    );
708
709    Ok(crate::mlmd::artifact::Artifact::from((
710        types[0].clone(),
711        artifacts[0].clone(),
712    )))
713}