use crate::ids::types::{IdsError, IdsResult, IdsUri};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
const PROV_NS: &str = "http://www.w3.org/ns/prov#";
const XSD_NS: &str = "http://www.w3.org/2001/XMLSchema#";
const RDF_NS: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#";
pub struct ProvenanceGraph {
graph_uri: String,
records: Arc<RwLock<HashMap<String, LineageRecord>>>,
activities: Arc<RwLock<HashMap<String, Activity>>>,
agents: Arc<RwLock<HashMap<String, Agent>>>,
}
impl ProvenanceGraph {
pub fn new(graph_uri: impl Into<String>) -> Self {
Self {
graph_uri: graph_uri.into(),
records: Arc::new(RwLock::new(HashMap::new())),
activities: Arc::new(RwLock::new(HashMap::new())),
agents: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn graph_uri(&self) -> &str {
&self.graph_uri
}
pub async fn record_lineage(&self, record: LineageRecord) -> IdsResult<()> {
let entity_uri = record.entity.as_str().to_string();
{
let mut records = self.records.write().await;
records.insert(entity_uri.clone(), record.clone());
}
if let Some(ref activity) = record.generated_by {
let mut activities = self.activities.write().await;
activities.insert(activity.id.as_str().to_string(), activity.clone());
}
if let Some(ref agent) = record.attributed_to {
let mut agents = self.agents.write().await;
agents.insert(agent.id.as_str().to_string(), agent.clone());
}
Ok(())
}
pub async fn query_lineage(&self, entity: &IdsUri) -> IdsResult<Vec<LineageRecord>> {
let entity_uri = entity.as_str();
let records = self.records.read().await;
let mut result = Vec::new();
if let Some(record) = records.get(entity_uri) {
result.push(record.clone());
for derived_from in &record.derived_from {
if let Some(source_record) = records.get(derived_from.as_str()) {
result.push(source_record.clone());
}
}
}
Ok(result)
}
pub async fn query_full_lineage(&self, entity: &IdsUri) -> IdsResult<LineageChain> {
let records = self.records.read().await;
let mut chain = LineageChain::new(entity.clone());
self.build_lineage_chain(&records, entity.as_str(), &mut chain, 0, 10)?;
Ok(chain)
}
fn build_lineage_chain(
&self,
records: &HashMap<String, LineageRecord>,
entity_uri: &str,
chain: &mut LineageChain,
depth: usize,
max_depth: usize,
) -> IdsResult<()> {
if depth >= max_depth {
return Ok(());
}
if let Some(record) = records.get(entity_uri) {
chain.records.push(record.clone());
for derived_from in &record.derived_from {
self.build_lineage_chain(
records,
derived_from.as_str(),
chain,
depth + 1,
max_depth,
)?;
}
}
Ok(())
}
pub async fn query_by_activity(&self, activity_id: &IdsUri) -> IdsResult<Vec<LineageRecord>> {
let records = self.records.read().await;
let activity_uri = activity_id.as_str();
Ok(records
.values()
.filter(|r| {
r.generated_by
.as_ref()
.map(|a| a.id.as_str() == activity_uri)
.unwrap_or(false)
})
.cloned()
.collect())
}
pub async fn query_by_agent(&self, agent_id: &IdsUri) -> IdsResult<Vec<LineageRecord>> {
let records = self.records.read().await;
let agent_uri = agent_id.as_str();
Ok(records
.values()
.filter(|r| {
r.attributed_to
.as_ref()
.map(|a| a.id.as_str() == agent_uri)
.unwrap_or(false)
})
.cloned()
.collect())
}
pub fn to_ntriples(&self, record: &LineageRecord) -> String {
let mut triples = Vec::new();
let entity_uri = record.entity.as_str();
triples.push(format!(
"<{}> <{}type> <{}Entity> .",
entity_uri, RDF_NS, PROV_NS
));
triples.push(format!(
"<{}> <{}generatedAtTime> \"{}\"^^<{}dateTime> .",
entity_uri,
PROV_NS,
record.generated_at.to_rfc3339(),
XSD_NS
));
for derived_from in &record.derived_from {
triples.push(format!(
"<{}> <{}wasDerivedFrom> <{}> .",
entity_uri,
PROV_NS,
derived_from.as_str()
));
}
if let Some(ref activity) = record.generated_by {
triples.push(format!(
"<{}> <{}wasGeneratedBy> <{}> .",
entity_uri,
PROV_NS,
activity.id.as_str()
));
triples.push(format!(
"<{}> <{}type> <{}Activity> .",
activity.id.as_str(),
RDF_NS,
PROV_NS
));
if let Some(ref started) = activity.started_at {
triples.push(format!(
"<{}> <{}startedAtTime> \"{}\"^^<{}dateTime> .",
activity.id.as_str(),
PROV_NS,
started.to_rfc3339(),
XSD_NS
));
}
if let Some(ref ended) = activity.ended_at {
triples.push(format!(
"<{}> <{}endedAtTime> \"{}\"^^<{}dateTime> .",
activity.id.as_str(),
PROV_NS,
ended.to_rfc3339(),
XSD_NS
));
}
}
if let Some(ref agent) = record.attributed_to {
triples.push(format!(
"<{}> <{}wasAttributedTo> <{}> .",
entity_uri,
PROV_NS,
agent.id.as_str()
));
triples.push(format!(
"<{}> <{}type> <{}Agent> .",
agent.id.as_str(),
RDF_NS,
PROV_NS
));
let agent_type = match agent.agent_type {
AgentType::Person => "Person",
AgentType::Organization => "Organization",
AgentType::SoftwareAgent => "SoftwareAgent",
};
triples.push(format!(
"<{}> <{}type> <{}{}> .",
agent.id.as_str(),
RDF_NS,
PROV_NS,
agent_type
));
}
if let Some((start, end)) = record.validity_period {
triples.push(format!(
"<{}> <{}invalidatedAtTime> \"{}\"^^<{}dateTime> .",
entity_uri,
PROV_NS,
end.to_rfc3339(),
XSD_NS
));
}
triples.join("\n")
}
pub fn generate_lineage_query(&self, entity_uri: &str) -> String {
format!(
r#"
PREFIX prov: <http://www.w3.org/ns/prov#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
CONSTRUCT {{
?entity prov:wasDerivedFrom ?source .
?entity prov:wasGeneratedBy ?activity .
?entity prov:wasAttributedTo ?agent .
?entity prov:generatedAtTime ?time .
?activity prov:startedAtTime ?actStart .
?activity prov:endedAtTime ?actEnd .
?agent a ?agentType .
}}
FROM <{}>
WHERE {{
VALUES ?entity {{ <{}> }}
OPTIONAL {{ ?entity prov:wasDerivedFrom ?source . }}
OPTIONAL {{ ?entity prov:wasGeneratedBy ?activity .
OPTIONAL {{ ?activity prov:startedAtTime ?actStart . }}
OPTIONAL {{ ?activity prov:endedAtTime ?actEnd . }}
}}
OPTIONAL {{ ?entity prov:wasAttributedTo ?agent .
OPTIONAL {{ ?agent a ?agentType . }}
}}
OPTIONAL {{ ?entity prov:generatedAtTime ?time . }}
}}
"#,
self.graph_uri, entity_uri
)
}
pub async fn export_ntriples(&self) -> String {
let records = self.records.read().await;
let mut all_triples = Vec::new();
for record in records.values() {
all_triples.push(self.to_ntriples(record));
}
all_triples.join("\n")
}
pub async fn statistics(&self) -> ProvenanceStatistics {
let records = self.records.read().await;
let activities = self.activities.read().await;
let agents = self.agents.read().await;
ProvenanceStatistics {
entity_count: records.len(),
activity_count: activities.len(),
agent_count: agents.len(),
derivation_count: records.values().map(|r| r.derived_from.len()).sum(),
}
}
}
impl Default for ProvenanceGraph {
fn default() -> Self {
Self::new("urn:ids:provenance:graph")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LineageRecord {
pub entity: IdsUri,
#[serde(default)]
pub derived_from: Vec<IdsUri>,
pub generated_by: Option<Activity>,
pub attributed_to: Option<Agent>,
pub generated_at: DateTime<Utc>,
pub validity_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
pub signature: Option<String>,
}
impl LineageRecord {
pub fn new(entity: IdsUri) -> Self {
Self {
entity,
derived_from: Vec::new(),
generated_by: None,
attributed_to: None,
generated_at: Utc::now(),
validity_period: None,
signature: None,
}
}
pub fn derived(entity: IdsUri, sources: Vec<IdsUri>) -> Self {
Self {
entity,
derived_from: sources,
generated_by: None,
attributed_to: None,
generated_at: Utc::now(),
validity_period: None,
signature: None,
}
}
pub fn with_activity(mut self, activity: Activity) -> Self {
self.generated_by = Some(activity);
self
}
pub fn with_agent(mut self, agent: Agent) -> Self {
self.attributed_to = Some(agent);
self
}
pub fn with_validity(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
self.validity_period = Some((start, end));
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Activity {
pub id: IdsUri,
pub activity_type: String,
pub started_at: Option<DateTime<Utc>>,
pub ended_at: Option<DateTime<Utc>>,
}
impl Activity {
pub fn new(id: IdsUri, activity_type: impl Into<String>) -> Self {
Self {
id,
activity_type: activity_type.into(),
started_at: Some(Utc::now()),
ended_at: None,
}
}
pub fn completed(
id: IdsUri,
activity_type: impl Into<String>,
started: DateTime<Utc>,
ended: DateTime<Utc>,
) -> Self {
Self {
id,
activity_type: activity_type.into(),
started_at: Some(started),
ended_at: Some(ended),
}
}
pub fn end(&mut self) {
self.ended_at = Some(Utc::now());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub id: IdsUri,
pub name: String,
pub agent_type: AgentType,
}
impl Agent {
pub fn software(id: IdsUri, name: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
agent_type: AgentType::SoftwareAgent,
}
}
pub fn person(id: IdsUri, name: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
agent_type: AgentType::Person,
}
}
pub fn organization(id: IdsUri, name: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
agent_type: AgentType::Organization,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AgentType {
Person,
Organization,
SoftwareAgent,
}
#[derive(Debug, Clone)]
pub struct LineageChain {
pub root: IdsUri,
pub records: Vec<LineageRecord>,
}
impl LineageChain {
fn new(root: IdsUri) -> Self {
Self {
root,
records: Vec::new(),
}
}
pub fn depth(&self) -> usize {
self.records.len()
}
pub fn contains(&self, entity: &IdsUri) -> bool {
self.records.iter().any(|r| &r.entity == entity)
}
}
#[derive(Debug, Clone)]
pub struct ProvenanceStatistics {
pub entity_count: usize,
pub activity_count: usize,
pub agent_count: usize,
pub derivation_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_provenance_graph() {
let pg = ProvenanceGraph::default();
let source = IdsUri::new("https://example.org/data/source").expect("valid URI");
let derived = IdsUri::new("https://example.org/data/derived").expect("valid URI");
let source_record = LineageRecord::new(source.clone()).with_agent(Agent::software(
IdsUri::new("https://example.org/agent/ingester").expect("valid URI"),
"Data Ingester",
));
pg.record_lineage(source_record).await.expect("record");
let derived_record = LineageRecord::derived(derived.clone(), vec![source.clone()])
.with_activity(Activity::new(
IdsUri::new("https://example.org/activity/transform").expect("valid URI"),
"Transformation",
))
.with_agent(Agent::software(
IdsUri::new("https://example.org/agent/processor").expect("valid URI"),
"Data Processor",
));
pg.record_lineage(derived_record).await.expect("record");
let lineage = pg.query_lineage(&derived).await.expect("query");
assert_eq!(lineage.len(), 2);
let stats = pg.statistics().await;
assert_eq!(stats.entity_count, 2);
assert_eq!(stats.derivation_count, 1);
}
#[tokio::test]
async fn test_full_lineage_chain() {
let pg = ProvenanceGraph::default();
let entity1 = IdsUri::new("https://example.org/data/1").expect("valid URI");
let entity2 = IdsUri::new("https://example.org/data/2").expect("valid URI");
let entity3 = IdsUri::new("https://example.org/data/3").expect("valid URI");
pg.record_lineage(LineageRecord::new(entity1.clone()))
.await
.expect("record");
pg.record_lineage(LineageRecord::derived(
entity2.clone(),
vec![entity1.clone()],
))
.await
.expect("record");
pg.record_lineage(LineageRecord::derived(
entity3.clone(),
vec![entity2.clone()],
))
.await
.expect("record");
let chain = pg.query_full_lineage(&entity3).await.expect("query");
assert_eq!(chain.depth(), 3);
assert!(chain.contains(&entity1));
assert!(chain.contains(&entity2));
assert!(chain.contains(&entity3));
}
#[test]
fn test_ntriples_generation() {
let pg = ProvenanceGraph::default();
let record = LineageRecord {
entity: IdsUri::new("https://example.org/data/1").expect("valid URI"),
derived_from: vec![IdsUri::new("https://example.org/data/source").expect("valid URI")],
generated_by: Some(Activity {
id: IdsUri::new("https://example.org/activity/transform").expect("valid URI"),
activity_type: "Transformation".to_string(),
started_at: Some(Utc::now()),
ended_at: Some(Utc::now()),
}),
attributed_to: Some(Agent {
id: IdsUri::new("https://example.org/agent/processor").expect("valid URI"),
name: "Data Processor".to_string(),
agent_type: AgentType::SoftwareAgent,
}),
generated_at: Utc::now(),
validity_period: None,
signature: None,
};
let ntriples = pg.to_ntriples(&record);
assert!(ntriples.contains("prov#Entity"));
assert!(ntriples.contains("prov#wasDerivedFrom"));
assert!(ntriples.contains("prov#wasGeneratedBy"));
assert!(ntriples.contains("prov#wasAttributedTo"));
assert!(ntriples.contains("prov#SoftwareAgent"));
}
#[test]
fn test_sparql_query_generation() {
let pg = ProvenanceGraph::default();
let query = pg.generate_lineage_query("https://example.org/data/1");
assert!(query.contains("PREFIX prov:"));
assert!(query.contains("prov:wasDerivedFrom"));
assert!(query.contains("prov:wasGeneratedBy"));
assert!(query.contains("https://example.org/data/1"));
}
#[tokio::test]
async fn test_query_by_activity() {
let pg = ProvenanceGraph::default();
let activity_id = IdsUri::new("https://example.org/activity/batch-1").expect("valid URI");
for i in 1..=3 {
let entity = IdsUri::new(format!("https://example.org/data/{}", i)).expect("valid URI");
let record = LineageRecord::new(entity)
.with_activity(Activity::new(activity_id.clone(), "BatchProcessing"));
pg.record_lineage(record).await.expect("record");
}
let records = pg.query_by_activity(&activity_id).await.expect("query");
assert_eq!(records.len(), 3);
}
}