use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::io::{BufRead, Read};
use std::path::Path;
use anybytes::{Bytes, View};
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine as _;
use blake3::Hasher;
use hifitime::prelude::*;
use num_rational::Ratio;
use winnow::error::InputError;
use winnow::stream::Stream;
use winnow::token::{take, take_while};
use winnow::Parser;
use crate::attribute::Attribute;
use crate::blob::encodings::longstring::LongString;
use crate::blob::encodings::rawbytes::RawBytes;
use crate::blob::{Blob, IntoBlob};
use crate::id::{ExclusiveId, Id, ID_LEN};
use crate::macros::entity;
use crate::prelude::inlineencodings;
use crate::repo::{BlobStore, Workspace};
use crate::trible::{Trible, TribleSet};
use crate::inline::encodings::genid::GenId;
use crate::inline::encodings::hash::Handle;
use crate::inline::encodings::shortstring::ShortString;
use crate::inline::encodings::time::{i128_to_ordered_be, NsDuration, NsTAIInterval};
use crate::inline::encodings::UnknownInline;
use crate::inline::encodings::boolean::Boolean;
use crate::inline::encodings::f64::F64;
use crate::inline::{RawInline, IntoInline, TryToInline, Inline};
const XSD: &str = "http://www.w3.org/2001/XMLSchema#";
#[derive(Debug, Clone)]
pub enum IngestError {
BnodeCycle {
labels: Vec<String>,
},
Io(String),
}
impl fmt::Display for IngestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BnodeCycle { labels } => {
write!(f, "blank-node cycle in input: {}", labels.join(", "))
}
Self::Io(msg) => write!(f, "i/o error reading n-triples: {msg}"),
}
}
}
impl std::error::Error for IngestError {}
enum OutgoingFact {
Resolved { attr_id: Id, value_raw: RawInline },
BnodeRef {
attr_id: Id,
target_label: View<str>,
},
}
struct IncomingFact {
subject_id: Id,
attr_id: Id,
target_label: View<str>,
}
struct BnodeBuffer {
outgoing: HashMap<View<str>, Vec<OutgoingFact>>,
incoming: Vec<IncomingFact>,
salt: [u8; 16],
}
impl BnodeBuffer {
fn new() -> Self {
let mut salt = [0u8; 16];
rand::Rng::fill(&mut rand::thread_rng(), &mut salt[..]);
Self {
outgoing: HashMap::new(),
incoming: Vec::new(),
salt,
}
}
fn is_empty(&self) -> bool {
self.outgoing.is_empty() && self.incoming.is_empty()
}
fn push_outgoing(&mut self, label: View<str>, fact: OutgoingFact) {
self.outgoing.entry(label).or_default().push(fact);
}
fn push_incoming(&mut self, fact: IncomingFact) {
self.incoming.push(fact);
}
fn flush(self, facts: &mut TribleSet) -> Result<(), IngestError> {
if self.is_empty() {
return Ok(());
}
let mut deps: HashMap<View<str>, HashSet<View<str>>> = HashMap::new();
let mut all_labels: HashSet<View<str>> = HashSet::new();
for (label, edges) in &self.outgoing {
all_labels.insert(label.clone());
let entry = deps.entry(label.clone()).or_default();
for edge in edges {
if let OutgoingFact::BnodeRef { target_label, .. } = edge {
entry.insert(target_label.clone());
all_labels.insert(target_label.clone());
}
}
}
for inc in &self.incoming {
all_labels.insert(inc.target_label.clone());
}
let order = topo_sort(&all_labels, &deps).map_err(|labels| {
let mut sorted: Vec<String> = labels.iter().map(|v| v.as_ref().to_owned()).collect();
sorted.sort();
IngestError::BnodeCycle { labels: sorted }
})?;
let mut resolved: HashMap<View<str>, Id> = HashMap::new();
for label in order {
let id = resolve_bnode_id(&label, &self.outgoing, &resolved, &self.salt);
resolved.insert(label, id);
}
for (label, edges) in self.outgoing {
let subject_id = resolved[&label];
let e = ExclusiveId::force_ref(&subject_id);
for edge in edges {
let (attr_id, value_raw) = match edge {
OutgoingFact::Resolved { attr_id, value_raw } => (attr_id, value_raw),
OutgoingFact::BnodeRef {
attr_id,
target_label,
} => {
let target_id = resolved[&target_label];
let v: Inline<GenId> = target_id.to_inline();
(attr_id, v.raw)
}
};
let v: Inline<UnknownInline> = Inline::new(value_raw);
facts.insert(&Trible::new(e, &attr_id, &v));
}
}
for inc in self.incoming {
let target_id = resolved[&inc.target_label];
let e = ExclusiveId::force_ref(&inc.subject_id);
let g: Inline<GenId> = target_id.to_inline();
let v: Inline<UnknownInline> = Inline::new(g.raw);
facts.insert(&Trible::new(e, &inc.attr_id, &v));
}
Ok(())
}
}
fn resolve_bnode_id(
label: &View<str>,
outgoing: &HashMap<View<str>, Vec<OutgoingFact>>,
resolved: &HashMap<View<str>, Id>,
salt: &[u8; 16],
) -> Id {
let pairs: Vec<(Id, RawInline)> = outgoing
.get(label)
.map(|edges| {
edges
.iter()
.map(|edge| match edge {
OutgoingFact::Resolved { attr_id, value_raw } => (*attr_id, *value_raw),
OutgoingFact::BnodeRef {
attr_id,
target_label,
} => {
let target_id = resolved
.get(target_label)
.expect("topo order resolved this target first");
let v: Inline<GenId> = target_id.to_inline();
(*attr_id, v.raw)
}
})
.collect()
})
.unwrap_or_default();
if pairs.is_empty() {
let mut hasher = Hasher::new();
hasher.update(salt);
hasher.update(label.as_ref().as_bytes());
let digest = hasher.finalize();
let mut raw = [0u8; ID_LEN];
raw.copy_from_slice(&digest.as_bytes()[digest.as_bytes().len() - ID_LEN..]);
return Id::new(raw).expect("non-nil from random salt");
}
let mut pairs = pairs;
pairs.sort_unstable();
let mut hasher = Hasher::new();
let mut last: Option<(Id, RawInline)> = None;
for (a, v) in &pairs {
if let Some((la, lv)) = last {
if *a == la && *v == lv {
continue;
}
}
hasher.update(&a[..]);
hasher.update(&v[..]);
last = Some((*a, *v));
}
let digest = hasher.finalize();
let mut raw = [0u8; ID_LEN];
raw.copy_from_slice(&digest.as_bytes()[digest.as_bytes().len() - ID_LEN..]);
Id::new(raw).expect("intrinsic id from non-empty pairs")
}
fn topo_sort(
nodes: &HashSet<View<str>>,
edges: &HashMap<View<str>, HashSet<View<str>>>,
) -> Result<Vec<View<str>>, Vec<View<str>>> {
let mut in_degree: HashMap<View<str>, usize> =
nodes.iter().map(|n| (n.clone(), 0)).collect();
for dsts in edges.values() {
for dst in dsts {
*in_degree.entry(dst.clone()).or_insert(0) += 1;
}
}
let mut queue: VecDeque<View<str>> = in_degree
.iter()
.filter(|(_, d)| **d == 0)
.map(|(n, _)| n.clone())
.collect();
let mut order: Vec<View<str>> = Vec::with_capacity(nodes.len());
while let Some(n) = queue.pop_front() {
if let Some(dsts) = edges.get(&n) {
for dst in dsts {
let d = in_degree.get_mut(dst).expect("dst recorded above");
*d -= 1;
if *d == 0 {
queue.push_back(dst.clone());
}
}
}
order.push(n);
}
if order.len() < nodes.len() {
let cycle: Vec<View<str>> = nodes
.iter()
.filter(|n| in_degree.get(*n).copied().unwrap_or(0) > 0)
.cloned()
.collect();
Err(cycle)
} else {
Ok(order)
}
}
enum LiteralSuffix {
None,
Datatype(View<str>),
Language(View<str>),
}
fn skip_ws_and_comments(bytes: &mut Bytes) {
loop {
while matches!(bytes.peek_token(), Some(b) if matches!(b, b' ' | b'\t' | b'\n' | b'\r')) {
bytes.pop_front();
}
if bytes.peek_token() == Some(b'#') {
while let Some(b) = bytes.pop_front() {
if b == b'\n' {
break;
}
}
continue;
}
break;
}
}
fn skip_inline_ws(bytes: &mut Bytes) {
while matches!(bytes.peek_token(), Some(b' ') | Some(b'\t')) {
bytes.pop_front();
}
}
fn take_iri(bytes: &mut Bytes) -> Option<View<str>> {
if bytes.peek_token() != Some(b'<') {
return None;
}
bytes.pop_front();
{
let mut tentative = bytes.clone();
let mut take = take_while::<_, _, InputError<Bytes>>(0.., |b: u8| {
b > 0x20 && !matches!(b, b'<' | b'>' | b'"' | b'{' | b'}' | b'|' | b'^' | b'`' | b'\\')
});
if let Ok(prefix) = take.parse_next(&mut tentative) {
if tentative.peek_token() == Some(b'>') {
tentative.pop_front();
*bytes = tentative;
return prefix.view::<str>().ok();
}
}
}
let mut out: Vec<u8> = Vec::new();
while let Some(b) = bytes.peek_token() {
match b {
b'>' => {
bytes.pop_front();
return Bytes::from_source(out).view::<str>().ok();
}
b'\\' => {
bytes.pop_front();
let kind = bytes.pop_front()?;
let decoded = match kind {
b'u' => parse_uchar(bytes, 4)?,
b'U' => parse_uchar(bytes, 8)?,
_ => return None, };
out.extend_from_slice(&decoded);
}
0..=0x20 | b'<' | b'"' | b'{' | b'}' | b'|' | b'^' | b'`' => return None,
_ => {
out.push(b);
bytes.pop_front();
}
}
}
None
}
fn take_bnode(bytes: &mut Bytes) -> Option<View<str>> {
if bytes.peek_token() != Some(b'_') {
return None;
}
let mut tentative = bytes.clone();
let mut prefix = take::<_, _, InputError<Bytes>>(2usize);
let head = prefix.parse_next(&mut tentative).ok()?;
if head.as_ref() != b"_:" {
return None;
}
let mut take_label = take_while::<_, _, InputError<Bytes>>(1.., |b: u8| {
!matches!(b, b' ' | b'\t' | b'\n' | b'\r')
});
let label = take_label.parse_next(&mut tentative).ok()?;
*bytes = tentative;
let mut combined = Vec::with_capacity(2 + label.len());
combined.extend_from_slice(b"_:");
combined.extend_from_slice(label.as_ref());
Bytes::from_source(combined).view::<str>().ok()
}
fn take_literal(bytes: &mut Bytes) -> Option<(Bytes, LiteralSuffix)> {
if bytes.peek_token() != Some(b'"') {
return None;
}
bytes.pop_front();
{
let mut tentative = bytes.clone();
let mut take = take_while::<_, _, InputError<Bytes>>(0.., |b: u8| {
b != b'"' && b != b'\\' && b != b'\n' && b != b'\r'
});
if let Ok(prefix) = take.parse_next(&mut tentative) {
if tentative.peek_token() == Some(b'"') {
tentative.pop_front();
*bytes = tentative;
let suffix = parse_literal_suffix(bytes)?;
return Some((prefix, suffix));
}
}
}
let mut out: Vec<u8> = Vec::new();
loop {
let b = bytes.peek_token()?;
match b {
b'"' => {
bytes.pop_front();
let suffix = parse_literal_suffix(bytes)?;
return Some((Bytes::from_source(out), suffix));
}
b'\\' => {
bytes.pop_front();
let kind = bytes.pop_front()?;
match kind {
b'n' => out.push(b'\n'),
b't' => out.push(b'\t'),
b'r' => out.push(b'\r'),
b'b' => out.push(0x08),
b'f' => out.push(0x0c),
b'"' => out.push(b'"'),
b'\'' => out.push(b'\''),
b'\\' => out.push(b'\\'),
b'u' => {
let decoded = parse_uchar(bytes, 4)?;
out.extend_from_slice(&decoded);
}
b'U' => {
let decoded = parse_uchar(bytes, 8)?;
out.extend_from_slice(&decoded);
}
_ => return None,
}
}
b'\n' | b'\r' => return None,
_ => {
out.push(b);
bytes.pop_front();
}
}
}
}
fn parse_uchar(bytes: &mut Bytes, hex_digits: usize) -> Option<Vec<u8>> {
let mut grab = take::<_, _, InputError<Bytes>>(hex_digits);
let hex = grab.parse_next(bytes).ok()?;
let mut code: u32 = 0;
for h in hex.as_ref() {
code = (code << 4)
| match h {
b'0'..=b'9' => (h - b'0') as u32,
b'a'..=b'f' => (h - b'a' + 10) as u32,
b'A'..=b'F' => (h - b'A' + 10) as u32,
_ => return None,
};
}
let ch = char::from_u32(code)?;
let mut buf = [0u8; 4];
Some(ch.encode_utf8(&mut buf).as_bytes().to_vec())
}
fn parse_literal_suffix(bytes: &mut Bytes) -> Option<LiteralSuffix> {
match bytes.peek_token() {
Some(b'^') => {
bytes.pop_front();
if bytes.pop_front() != Some(b'^') {
return None;
}
let dt = take_iri(bytes)?;
Some(LiteralSuffix::Datatype(dt))
}
Some(b'@') => {
bytes.pop_front();
let mut take = take_while::<_, _, InputError<Bytes>>(1.., |b: u8| {
b.is_ascii_alphanumeric() || b == b'-'
});
let tag = take.parse_next(bytes).ok()?;
tag.view::<str>().ok().map(LiteralSuffix::Language)
}
_ => Some(LiteralSuffix::None),
}
}
fn parse_decimal(s: &str) -> Option<Ratio<i128>> {
if let Some(dot_pos) = s.find('.') {
let decimals = s.len() - dot_pos - 1;
let without_dot: String = s.chars().filter(|c| *c != '.').collect();
let numerator: i128 = without_dot.parse().ok()?;
let denominator: i128 = 10i128.checked_pow(decimals as u32)?;
Some(Ratio::new(numerator, denominator))
} else {
let n: i128 = s.parse().ok()?;
Some(Ratio::from_integer(n))
}
}
fn parse_year(mut s: &str) -> Option<(i32, &str)> {
let neg = if let Some(rest) = s.strip_prefix('-') {
s = rest;
true
} else {
false
};
let digits_end = s
.as_bytes()
.iter()
.position(|b| !b.is_ascii_digit())
.unwrap_or(s.len());
if digits_end < 4 {
return None;
}
let year_abs: i64 = s[..digits_end].parse().ok()?;
let year: i32 = if neg {
i32::try_from(-year_abs).ok()?
} else {
i32::try_from(year_abs).ok()?
};
Some((year, &s[digits_end..]))
}
fn parse_timezone_offset(s: &str) -> Option<i64> {
if s.is_empty() {
return Some(0);
}
if s == "Z" {
return Some(0);
}
let bytes = s.as_bytes();
let sign = match bytes.first()? {
b'+' => 1i64,
b'-' => -1i64,
_ => return None,
};
if bytes.len() != 6 || bytes[3] != b':' {
return None;
}
let hh: i64 = std::str::from_utf8(&bytes[1..3]).ok()?.parse().ok()?;
let mm: i64 = std::str::from_utf8(&bytes[4..6]).ok()?.parse().ok()?;
Some(sign * (hh * 3600 + mm * 60))
}
fn epoch_from_gregorian_with_offset(
year: i32,
month: u8,
day: u8,
hh: u8,
mm: u8,
ss: u8,
ns: u32,
offset_secs: i64,
) -> Option<Epoch> {
let local = Epoch::maybe_from_gregorian_utc(year, month, day, hh, mm, ss, ns).ok()?;
Some(local - Duration::from_seconds(offset_secs as f64))
}
fn parse_xsd_datetime(s: &str) -> Option<i128> {
let (year, rest) = parse_year(s)?;
let mut chars = rest.as_bytes();
if chars.first() != Some(&b'-') {
return None;
}
let month: u8 = std::str::from_utf8(chars.get(1..3)?).ok()?.parse().ok()?;
if chars.get(3) != Some(&b'-') {
return None;
}
let day: u8 = std::str::from_utf8(chars.get(4..6)?).ok()?.parse().ok()?;
if chars.get(6) != Some(&b'T') {
return None;
}
let hh: u8 = std::str::from_utf8(chars.get(7..9)?).ok()?.parse().ok()?;
if chars.get(9) != Some(&b':') {
return None;
}
let mm: u8 = std::str::from_utf8(chars.get(10..12)?).ok()?.parse().ok()?;
if chars.get(12) != Some(&b':') {
return None;
}
let ss: u8 = std::str::from_utf8(chars.get(13..15)?).ok()?.parse().ok()?;
chars = &chars[15..];
let mut ns: u32 = 0;
if chars.first() == Some(&b'.') {
chars = &chars[1..];
let frac_end = chars
.iter()
.position(|b| !b.is_ascii_digit())
.unwrap_or(chars.len());
let frac_str = std::str::from_utf8(&chars[..frac_end]).ok()?;
let mut padded = String::with_capacity(9);
padded.push_str(frac_str);
while padded.len() < 9 {
padded.push('0');
}
ns = padded[..9].parse().ok()?;
chars = &chars[frac_end..];
}
let tz = std::str::from_utf8(chars).ok()?;
let offset = parse_timezone_offset(tz)?;
let epoch = epoch_from_gregorian_with_offset(year, month, day, hh, mm, ss, ns, offset)?;
Some(epoch.to_tai_duration().total_nanoseconds())
}
fn parse_xsd_date(s: &str) -> Option<(i128, i128)> {
let (year, rest) = parse_year(s)?;
let bytes = rest.as_bytes();
if bytes.first() != Some(&b'-') {
return None;
}
let month: u8 = std::str::from_utf8(bytes.get(1..3)?).ok()?.parse().ok()?;
if bytes.get(3) != Some(&b'-') {
return None;
}
let day: u8 = std::str::from_utf8(bytes.get(4..6)?).ok()?.parse().ok()?;
let tz = std::str::from_utf8(&bytes[6..]).ok()?;
let offset = parse_timezone_offset(tz)?;
let lower = epoch_from_gregorian_with_offset(year, month, day, 0, 0, 0, 0, offset)?
.to_tai_duration()
.total_nanoseconds();
let upper = lower.checked_add(86_400_000_000_000i128 - 1)?;
Some((lower, upper))
}
fn parse_xsd_gyear(s: &str) -> Option<(i128, i128)> {
let (year, rest) = parse_year(s)?;
let offset = parse_timezone_offset(rest)?;
let lower = epoch_from_gregorian_with_offset(year, 1, 1, 0, 0, 0, 0, offset)?
.to_tai_duration()
.total_nanoseconds();
let next_year = year.checked_add(1)?;
let upper_excl = epoch_from_gregorian_with_offset(next_year, 1, 1, 0, 0, 0, 0, offset)?
.to_tai_duration()
.total_nanoseconds();
Some((lower, upper_excl.checked_sub(1)?))
}
fn parse_xsd_gyearmonth(s: &str) -> Option<(i128, i128)> {
let (year, rest) = parse_year(s)?;
let bytes = rest.as_bytes();
if bytes.first() != Some(&b'-') {
return None;
}
let month: u8 = std::str::from_utf8(bytes.get(1..3)?).ok()?.parse().ok()?;
if !(1..=12).contains(&month) {
return None;
}
let tz = std::str::from_utf8(&bytes[3..]).ok()?;
let offset = parse_timezone_offset(tz)?;
let lower = epoch_from_gregorian_with_offset(year, month, 1, 0, 0, 0, 0, offset)?
.to_tai_duration()
.total_nanoseconds();
let (next_year, next_month) = if month == 12 {
(year.checked_add(1)?, 1u8)
} else {
(year, month + 1)
};
let upper_excl = epoch_from_gregorian_with_offset(next_year, next_month, 1, 0, 0, 0, 0, offset)?
.to_tai_duration()
.total_nanoseconds();
Some((lower, upper_excl.checked_sub(1)?))
}
fn parse_xsd_duration(s: &str) -> Option<i128> {
let mut s = s;
let neg = if let Some(rest) = s.strip_prefix('-') {
s = rest;
true
} else {
false
};
let mut s = s.strip_prefix('P')?;
let mut total_ns: i128 = 0;
let mut in_time = false;
while !s.is_empty() {
if let Some(rest) = s.strip_prefix('T') {
in_time = true;
s = rest;
continue;
}
let num_end = s
.as_bytes()
.iter()
.position(|b| !(b.is_ascii_digit() || *b == b'.'))?;
let num_str = &s[..num_end];
let unit = s.as_bytes().get(num_end).copied()?;
s = &s[num_end + 1..];
let value: f64 = num_str.parse().ok()?;
match (in_time, unit) {
(false, b'Y') | (false, b'M') => {
return None;
}
(false, b'D') => total_ns = total_ns.checked_add((value * 86_400e9) as i128)?,
(true, b'H') => total_ns = total_ns.checked_add((value * 3_600e9) as i128)?,
(true, b'M') => total_ns = total_ns.checked_add((value * 60e9) as i128)?,
(true, b'S') => total_ns = total_ns.checked_add((value * 1e9) as i128)?,
_ => return None,
}
}
Some(if neg { -total_ns } else { total_ns })
}
pub fn uri_to_id<Blobs>(ws: &mut Workspace<Blobs>, uri: &str) -> Id
where
Blobs: BlobStore,
{
let handle: Inline<Handle<LongString>> = ws.put(uri.to_owned());
let fragment = entity! { crate::import::rdf_uri: handle };
fragment.root().expect("intrinsic URI entity")
}
pub fn uri_to_id_pure(uri: &str) -> Id {
let handle: Inline<Handle<LongString>> =
uri.to_owned().to_blob().get_handle();
let fragment = entity! { crate::import::rdf_uri: handle };
fragment.root().expect("intrinsic URI entity")
}
pub fn import_bytes<Blobs>(
ws: &mut Workspace<Blobs>,
mut bytes: Bytes,
) -> Result<(TribleSet, usize), IngestError>
where
Blobs: BlobStore,
{
let mut facts = TribleSet::new();
let mut bnodes = BnodeBuffer::new();
let mut count = 0;
let mut attr_cache = NTriplesAttrCache::default();
loop {
skip_ws_and_comments(&mut bytes);
if bytes.peek_token().is_none() {
break;
}
if parse_triple(ws, &mut facts, &mut bnodes, &mut bytes, &mut attr_cache) {
count += 1;
} else {
while let Some(b) = bytes.pop_front() {
if b == b'\n' {
break;
}
}
}
}
bnodes.flush(&mut facts)?;
Ok((facts, count))
}
pub fn import_blob<Blobs>(
ws: &mut Workspace<Blobs>,
blob: Blob<LongString>,
) -> Result<(TribleSet, usize), IngestError>
where
Blobs: BlobStore,
{
import_bytes(ws, blob.bytes)
}
pub fn ingest_ntriples<Blobs>(
ws: &mut Workspace<Blobs>,
mut reader: impl BufRead,
) -> Result<(TribleSet, usize), IngestError>
where
Blobs: BlobStore,
{
let mut buf = Vec::new();
reader
.read_to_end(&mut buf)
.map_err(|e| IngestError::Io(e.to_string()))?;
import_bytes(ws, Bytes::from_source(buf))
}
#[derive(Default)]
struct NTriplesAttrCache {
genid: HashMap<String, Id>,
longstring: HashMap<String, Id>,
rawbytes: HashMap<String, Id>,
i256be: HashMap<String, Id>,
u256be: HashMap<String, Id>,
r256be: HashMap<String, Id>,
f64: HashMap<String, Id>,
boolean: HashMap<String, Id>,
nsduration: HashMap<String, Id>,
nstai: HashMap<String, Id>,
}
impl NTriplesAttrCache {
fn genid(&mut self, iri: &str) -> Id {
*self.genid.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::GenId>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::GenId as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn longstring(&mut self, iri: &str) -> Id {
*self.longstring.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<Handle<LongString>>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <Handle<LongString> as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn rawbytes(&mut self, iri: &str) -> Id {
*self.rawbytes.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<Handle<RawBytes>>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <Handle<RawBytes> as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn i256be(&mut self, iri: &str) -> Id {
*self.i256be.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::I256BE>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::I256BE as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn u256be(&mut self, iri: &str) -> Id {
*self.u256be.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::U256BE>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::U256BE as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn r256be(&mut self, iri: &str) -> Id {
*self.r256be.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::R256BE>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::R256BE as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn f64(&mut self, iri: &str) -> Id {
*self.f64.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::F64>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::F64 as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn boolean(&mut self, iri: &str) -> Id {
*self.boolean.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<inlineencodings::Boolean>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <inlineencodings::Boolean as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn nsduration(&mut self, iri: &str) -> Id {
*self.nsduration.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<NsDuration>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <NsDuration as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
fn nstai(&mut self, iri: &str) -> Id {
*self.nstai.entry(iri.to_string()).or_insert_with(|| {
let h: Inline<Handle<crate::blob::encodings::longstring::LongString>> =
String::from(iri).to_blob().get_handle();
Attribute::<NsTAIInterval>::from(entity! {
crate::metadata::iri: h,
crate::metadata::value_encoding: <NsTAIInterval as crate::metadata::MetaDescribe>::id(),
})
.id()
})
}
}
fn parse_triple<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
bnodes: &mut BnodeBuffer,
bytes: &mut Bytes,
attr_cache: &mut NTriplesAttrCache,
) -> bool
where
Blobs: BlobStore,
{
let (subject_iri, subject_label): (Option<View<str>>, Option<View<str>>) =
match bytes.peek_token() {
Some(b'<') => match take_iri(bytes) {
Some(uri) => (Some(uri), None),
None => return false,
},
Some(b'_') => match take_bnode(bytes) {
Some(label) => (None, Some(label)),
None => return false,
},
_ => return false,
};
skip_inline_ws(bytes);
let Some(predicate) = take_iri(bytes) else {
return false;
};
skip_inline_ws(bytes);
let iri_subject_anchor: Option<Id> = subject_iri.as_ref().map(|uri| {
let s = uri.as_ref();
let id = uri_to_id(ws, s);
let sub_h: Inline<Handle<LongString>> = ws.put(uri.clone());
*facts += entity! { crate::import::rdf_uri: sub_h };
id
});
let outcome = match bytes.peek_token() {
Some(b'<') => {
let Some(obj_uri) = take_iri(bytes) else {
return false;
};
emit_object_iri(
ws,
facts,
bnodes,
iri_subject_anchor,
subject_label,
predicate.as_ref(),
obj_uri,
attr_cache,
);
true
}
Some(b'_') => {
let Some(target_label) = take_bnode(bytes) else {
return false;
};
let attr_id = attr_cache.genid(predicate.as_ref());
match (iri_subject_anchor, subject_label) {
(Some(s_id), None) => {
bnodes.push_incoming(IncomingFact {
subject_id: s_id,
attr_id,
target_label,
});
}
(None, Some(s_label)) => {
bnodes.push_outgoing(
s_label,
OutgoingFact::BnodeRef {
attr_id,
target_label,
},
);
}
_ => return false,
}
true
}
Some(b'"') => {
let Some((text_bytes, suffix)) = take_literal(bytes) else {
return false;
};
let Ok(text) = text_bytes.view::<str>() else {
return false;
};
match (iri_subject_anchor, subject_label) {
(Some(s_id), None) => {
let e = ExclusiveId::force_ref(&s_id);
match suffix {
LiteralSuffix::None => {
emit_text_literal(ws, facts, e, predicate.as_ref(), text, attr_cache)
}
LiteralSuffix::Datatype(dt) => emit_typed_literal(
ws,
facts,
e,
predicate.as_ref(),
text,
dt.as_ref(),
attr_cache,
),
LiteralSuffix::Language(lang) => emit_lang_literal(
ws,
facts,
e,
predicate.as_ref(),
lang.as_ref(),
text,
attr_cache,
),
}
}
(None, Some(s_label)) => {
if let Some(fact) = build_resolved_outgoing(
ws,
facts,
predicate.as_ref(),
text,
suffix,
attr_cache,
) {
bnodes.push_outgoing(s_label, fact);
}
}
_ => return false,
}
true
}
_ => false,
};
if outcome {
skip_inline_ws(bytes);
if bytes.peek_token() != Some(b'.') {
return false;
}
bytes.pop_front();
}
outcome
}
fn emit_object_iri<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
bnodes: &mut BnodeBuffer,
iri_subject_anchor: Option<Id>,
subject_label: Option<View<str>>,
predicate: &str,
obj_uri: View<str>,
attr_cache: &mut NTriplesAttrCache,
) where
Blobs: BlobStore,
{
match (iri_subject_anchor, subject_label) {
(Some(s_id), None) => {
emit_uri_object(
ws,
facts,
&ExclusiveId::force_ref(&s_id),
predicate,
obj_uri.as_ref(),
attr_cache,
);
}
(None, Some(s_label)) => {
let attr_id = attr_cache.genid(predicate);
let obj_id = uri_to_id(ws, obj_uri.as_ref());
let obj_h: Inline<Handle<LongString>> = ws.put(obj_uri);
*facts += entity! { crate::import::rdf_uri: obj_h };
let g: Inline<GenId> = obj_id.to_inline();
bnodes.push_outgoing(
s_label,
OutgoingFact::Resolved {
attr_id,
value_raw: g.raw,
},
);
}
_ => {}
}
}
fn build_resolved_outgoing<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
predicate: &str,
text: View<str>,
suffix: LiteralSuffix,
attr_cache: &mut NTriplesAttrCache,
) -> Option<OutgoingFact>
where
Blobs: BlobStore,
{
match suffix {
LiteralSuffix::None => {
let attr_id = attr_cache.longstring(predicate);
let handle: Inline<Handle<LongString>> = ws.put(text);
Some(OutgoingFact::Resolved {
attr_id,
value_raw: handle.raw,
})
}
LiteralSuffix::Datatype(dt) => {
let mut scratch = TribleSet::new();
let scratch_id = Id::new([0xFF; ID_LEN]).expect("non-nil scratch id");
let scratch_e = ExclusiveId::force_ref(&scratch_id);
emit_typed_literal(
ws,
&mut scratch,
scratch_e,
predicate,
text,
dt.as_ref(),
attr_cache,
);
let pair = scratch
.iter()
.next()
.map(|t| (*t.a(), t.v::<UnknownInline>().raw));
pair.map(|(attr_id, value_raw)| OutgoingFact::Resolved { attr_id, value_raw })
}
LiteralSuffix::Language(lang) => {
let Ok(lang_value): Result<Inline<ShortString>, _> = lang.as_ref().try_to_inline() else {
return None;
};
let text_handle: Inline<Handle<LongString>> = ws.put(text);
let label_fragment = entity! {
crate::import::rdf_lang: lang_value,
crate::import::rdf_text: text_handle,
};
let label_id = label_fragment
.root()
.expect("intrinsic id from rdf_lang+rdf_text");
*facts += label_fragment;
let attr_id = attr_cache.genid(predicate);
let g: Inline<GenId> = label_id.to_inline();
Some(OutgoingFact::Resolved {
attr_id,
value_raw: g.raw,
})
}
}
}
fn emit_uri_object<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
e: &ExclusiveId,
predicate: &str,
obj_uri: &str,
attr_cache: &mut NTriplesAttrCache,
) where
Blobs: BlobStore,
{
let attr_id = attr_cache.genid(predicate);
let obj_id = uri_to_id(ws, obj_uri);
let obj_h: Inline<Handle<LongString>> = ws.put(obj_uri.to_owned());
*facts += entity! { crate::import::rdf_uri: obj_h };
let g: Inline<GenId> = obj_id.to_inline();
facts.insert(&Trible::new(e, &attr_id, &g));
}
fn emit_text_literal<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
e: &ExclusiveId,
predicate: &str,
text: View<str>,
attr_cache: &mut NTriplesAttrCache,
) where
Blobs: BlobStore,
{
let attr_id = attr_cache.longstring(predicate);
let handle: Inline<Handle<LongString>> = ws.put(text);
facts.insert(&Trible::new(e, &attr_id, &handle));
}
fn emit_typed_literal<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
e: &ExclusiveId,
predicate: &str,
text: View<str>,
datatype: &str,
attr_cache: &mut NTriplesAttrCache,
) where
Blobs: BlobStore,
{
if let Some(local) = datatype.strip_prefix(XSD) {
match local {
"integer" | "int" | "long" | "short" | "byte" | "negativeInteger"
| "nonPositiveInteger" => {
if let Ok(val) = text.parse::<i128>() {
let attr_id = attr_cache.i256be(predicate);
let v: Inline<inlineencodings::I256BE> = val.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
}
"nonNegativeInteger" | "positiveInteger" | "unsignedInt" | "unsignedLong"
| "unsignedShort" | "unsignedByte" => {
if let Ok(val) = text.parse::<u128>() {
let attr_id = attr_cache.u256be(predicate);
let v: Inline<inlineencodings::U256BE> = val.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
}
"decimal" => {
if let Some(val) = parse_decimal(text.as_ref()) {
let attr_id = attr_cache.r256be(predicate);
let v: Inline<inlineencodings::R256BE> = val.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
}
"float" | "double" => {
if let Ok(val) = text.parse::<f64>() {
let attr_id = attr_cache.f64(predicate);
let v: Inline<F64> = val.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
}
"boolean" => match text.as_ref() {
"true" | "1" => {
let attr_id = attr_cache.boolean(predicate);
let v: Inline<Boolean> = true.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
"false" | "0" => {
let attr_id = attr_cache.boolean(predicate);
let v: Inline<Boolean> = false.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
_ => {}
},
"dateTime" => {
if let Some(ns) = parse_xsd_datetime(text.as_ref()) {
emit_interval(facts, e, predicate, ns, ns, attr_cache);
return;
}
}
"date" => {
if let Some((lo, hi)) = parse_xsd_date(text.as_ref()) {
emit_interval(facts, e, predicate, lo, hi, attr_cache);
return;
}
}
"gYear" => {
if let Some((lo, hi)) = parse_xsd_gyear(text.as_ref()) {
emit_interval(facts, e, predicate, lo, hi, attr_cache);
return;
}
}
"gYearMonth" => {
if let Some((lo, hi)) = parse_xsd_gyearmonth(text.as_ref()) {
emit_interval(facts, e, predicate, lo, hi, attr_cache);
return;
}
}
"duration" | "dayTimeDuration" => {
if let Some(ns) = parse_xsd_duration(text.as_ref()) {
let attr_id = attr_cache.nsduration(predicate);
let v: Inline<NsDuration> = ns.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
return;
}
}
"hexBinary" => {
if let Ok(bytes) = hex::decode(text.as_ref()) {
let attr_id = attr_cache.rawbytes(predicate);
let handle: Inline<Handle<RawBytes>> = ws.put(bytes);
facts.insert(&Trible::new(e, &attr_id, &handle));
return;
}
}
"base64Binary" => {
if let Ok(bytes) = BASE64.decode(text.as_ref()) {
let attr_id = attr_cache.rawbytes(predicate);
let handle: Inline<Handle<RawBytes>> = ws.put(bytes);
facts.insert(&Trible::new(e, &attr_id, &handle));
return;
}
}
"anyURI" => {
emit_uri_object(ws, facts, e, predicate, text.as_ref(), attr_cache);
return;
}
_ => {}
}
}
emit_text_literal(ws, facts, e, predicate, text, attr_cache);
}
fn emit_interval(
facts: &mut TribleSet,
e: &ExclusiveId,
predicate: &str,
lo: i128,
hi: i128,
attr_cache: &mut NTriplesAttrCache,
) {
let attr_id = attr_cache.nstai(predicate);
let mut raw = [0u8; 32];
raw[0..16].copy_from_slice(&i128_to_ordered_be(lo));
raw[16..32].copy_from_slice(&i128_to_ordered_be(hi));
let v: Inline<NsTAIInterval> = Inline::new(raw);
facts.insert(&Trible::new(e, &attr_id, &v));
}
fn emit_lang_literal<Blobs>(
ws: &mut Workspace<Blobs>,
facts: &mut TribleSet,
e: &ExclusiveId,
predicate: &str,
lang: &str,
text: View<str>,
attr_cache: &mut NTriplesAttrCache,
) where
Blobs: BlobStore,
{
let Ok(lang_value): Result<Inline<ShortString>, _> = lang.try_to_inline() else {
return; };
let text_handle: Inline<Handle<LongString>> = ws.put(text);
let label_fragment = entity! {
crate::import::rdf_lang: lang_value,
crate::import::rdf_text: text_handle,
};
let label_id = label_fragment
.root()
.expect("intrinsic id from rdf_lang+rdf_text");
*facts += label_fragment;
let attr_id = attr_cache.genid(predicate);
let v: Inline<GenId> = label_id.to_inline();
facts.insert(&Trible::new(e, &attr_id, &v));
}
pub fn ingest_ntriples_file<Blobs>(
ws: &mut Workspace<Blobs>,
path: &Path,
) -> Result<(TribleSet, usize), IngestError>
where
Blobs: BlobStore,
{
let file = std::fs::File::open(path).map_err(|e| IngestError::Io(e.to_string()))?;
let mut reader = std::io::BufReader::new(file);
let mut buf = Vec::new();
reader
.read_to_end(&mut buf)
.map_err(|e| IngestError::Io(e.to_string()))?;
import_bytes(ws, Bytes::from_source(buf))
}
#[cfg(test)]
mod tests {
use super::*;
fn bytes_of(s: &str) -> Bytes {
Bytes::from_source(s.as_bytes().to_vec())
}
#[test]
fn take_iri_consumes_brackets() {
let mut input = bytes_of("<http://example.org/s> rest");
let iri = take_iri(&mut input).unwrap();
assert_eq!(iri.as_ref(), "http://example.org/s");
let remaining: Vec<u8> = (0..)
.scan(input.clone(), |b, _| b.pop_front())
.collect();
assert_eq!(&remaining[..5], b" rest");
}
#[test]
fn take_bnode_includes_prefix() {
let mut input = bytes_of("_:bf55954f96378f65ddb1da9836e2eb87 .");
let label = take_bnode(&mut input).unwrap();
assert_eq!(label.as_ref(), "_:bf55954f96378f65ddb1da9836e2eb87");
}
#[test]
fn take_bnode_allows_internal_dot() {
let mut input = bytes_of("_:foo.bar .");
let label = take_bnode(&mut input).unwrap();
assert_eq!(label.as_ref(), "_:foo.bar");
}
#[test]
fn take_literal_unescaped() {
let mut input = bytes_of(r#""hello" ."#);
let (text, suffix) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "hello");
assert!(matches!(suffix, LiteralSuffix::None));
}
#[test]
fn take_literal_with_datatype_suffix() {
let mut input = bytes_of(r#""42"^^<http://www.w3.org/2001/XMLSchema#integer> ."#);
let (text, suffix) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "42");
assert!(matches!(
suffix,
LiteralSuffix::Datatype(ref dt)
if dt.as_ref() == "http://www.w3.org/2001/XMLSchema#integer"
));
}
#[test]
fn take_literal_with_lang_tag() {
let mut input = bytes_of(r#""hello"@en ."#);
let (text, suffix) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "hello");
assert!(matches!(
suffix,
LiteralSuffix::Language(ref tag) if tag.as_ref() == "en"
));
}
#[test]
fn take_literal_with_lang_region() {
let mut input = bytes_of(r#""labor"@en-US ."#);
let (text, suffix) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "labor");
assert!(matches!(
suffix,
LiteralSuffix::Language(ref tag) if tag.as_ref() == "en-US"
));
}
#[test]
fn take_literal_with_basic_escapes() {
let mut input = bytes_of(r#""line\nbreak" ."#);
let (text, _) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "line\nbreak");
}
#[test]
fn take_literal_with_extended_echar() {
let mut input = bytes_of(r#""a\bb\fc\'d" ."#);
let (text, _) = take_literal(&mut input).unwrap();
assert_eq!(
text.view::<str>().unwrap().as_ref(),
"a\u{0008}b\u{000c}c'd"
);
}
#[test]
fn take_literal_with_unicode_escape_4() {
let mut input = bytes_of(r#""smile ☺ here" ."#);
let (text, _) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "smile ☺ here");
}
#[test]
fn take_literal_with_unicode_escape_8() {
let mut input = bytes_of(r#""grin \U0001F600 here" ."#);
let (text, _) = take_literal(&mut input).unwrap();
assert_eq!(text.view::<str>().unwrap().as_ref(), "grin 😀 here");
}
#[test]
fn take_iri_with_unicode_escape() {
let mut input = bytes_of(r#"<http://ex/é> rest"#);
let iri = take_iri(&mut input).unwrap();
assert_eq!(iri.as_ref(), "http://ex/é");
}
#[test]
fn decimal_parse_helper() {
let r = parse_decimal("3.14").unwrap();
assert_eq!(*r.numer(), 157);
assert_eq!(*r.denom(), 50);
let r = parse_decimal("42").unwrap();
assert_eq!(*r.numer(), 42);
assert_eq!(*r.denom(), 1);
let r = parse_decimal("-0.5").unwrap();
assert_eq!(*r.numer(), -1);
assert_eq!(*r.denom(), 2);
}
#[test]
fn xsd_datetime_z_and_offset() {
let utc = parse_xsd_datetime("2020-01-01T12:00:00Z").unwrap();
let plus5 = parse_xsd_datetime("2020-01-01T17:00:00+05:00").unwrap();
assert_eq!(utc, plus5);
}
#[test]
fn xsd_datetime_with_fractional_seconds() {
let no_frac = parse_xsd_datetime("2020-01-01T00:00:00Z").unwrap();
let with_frac = parse_xsd_datetime("2020-01-01T00:00:00.5Z").unwrap();
assert_eq!(with_frac - no_frac, 500_000_000);
}
#[test]
fn xsd_datetime_bce_year() {
assert!(parse_xsd_datetime("-0500-01-01T00:00:00Z").is_some());
}
#[test]
fn xsd_date_spans_one_day() {
let (lo, hi) = parse_xsd_date("2020-01-01").unwrap();
assert_eq!(hi - lo, 86_400_000_000_000 - 1);
}
#[test]
fn xsd_gyear_spans_full_year() {
let (lo_2020, hi_2020) = parse_xsd_gyear("2020").unwrap();
let (lo_2021, _) = parse_xsd_gyear("2021").unwrap();
assert_eq!(hi_2020 - lo_2020, 366 * 86_400_000_000_000 - 1);
assert_eq!(hi_2020 + 1, lo_2021);
}
#[test]
fn xsd_gyearmonth_spans_one_month() {
let (lo_jan, hi_jan) = parse_xsd_gyearmonth("2020-01").unwrap();
assert_eq!(hi_jan - lo_jan, 31 * 86_400_000_000_000 - 1);
let (_, hi_feb) = parse_xsd_gyearmonth("2020-02").unwrap();
let (lo_mar, _) = parse_xsd_gyearmonth("2020-03").unwrap();
assert_eq!(hi_feb + 1, lo_mar);
}
#[test]
fn xsd_duration_daytime_only() {
let ns = parse_xsd_duration("P1DT2H3M4.5S").unwrap();
let expected = 86_400_000_000_000i128
+ 2 * 3_600_000_000_000
+ 3 * 60_000_000_000
+ 4_500_000_000;
assert_eq!(ns, expected);
}
#[test]
fn xsd_duration_negative() {
let ns = parse_xsd_duration("-PT5S").unwrap();
assert_eq!(ns, -5_000_000_000);
}
#[test]
fn xsd_duration_rejects_year_month() {
assert!(parse_xsd_duration("P1Y").is_none());
assert!(parse_xsd_duration("P1M").is_none());
assert!(parse_xsd_duration("P1Y2M").is_none());
}
}