use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crossbeam_channel::{bounded, Sender};
use polars_core::prelude::*;
use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY;
use polars_io::prelude::*;
use crate::pipeline::morsels_per_sink;
pub(in crate::executors::sinks) type DfIter =
Box<dyn ExactSizeIterator<Item = DataFrame> + Sync + Send>;
type Payload = (Option<IdxCa>, DfIter);
pub(crate) struct IOThread {
sender: Sender<Payload>,
_lockfile: Arc<LockFile>,
pub(in crate::executors::sinks) dir: PathBuf,
pub(in crate::executors::sinks) sent: Arc<AtomicUsize>,
pub(in crate::executors::sinks) total: Arc<AtomicUsize>,
pub(in crate::executors::sinks) thread_local_count: Arc<AtomicUsize>,
schema: SchemaRef,
}
fn get_lockfile_path(dir: &Path) -> PathBuf {
let mut lockfile_path = dir.to_path_buf();
lockfile_path.push(".lock");
lockfile_path
}
fn gc_thread(operation_name: &'static str) {
let _ = std::thread::spawn(move || {
let mut dir = std::env::temp_dir();
dir.push(&format!("polars/{operation_name}"));
let rd = match std::fs::read_dir(&dir) {
Ok(rd) => rd,
_ => panic!("cannot find {:?}", dir),
};
for entry in rd {
let path = entry.unwrap().path();
if path.is_dir() {
let lockfile_path = get_lockfile_path(&path);
if let Ok(lockfile) = File::open(lockfile_path) {
if let Ok(time) = lockfile.metadata().unwrap().modified() {
let modified_since =
SystemTime::now().duration_since(time).unwrap().as_secs();
if modified_since > (SECONDS_IN_DAY as u64 * 30) {
std::fs::remove_dir_all(path).unwrap()
}
} else {
eprintln!("could not modified time on this platform")
}
} else {
std::fs::remove_dir_all(path).unwrap()
}
}
}
});
}
impl IOThread {
pub(in crate::executors::sinks) fn try_new(
schema: SchemaRef,
operation_name: &'static str,
) -> PolarsResult<Self> {
let uuid = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let mut dir = std::env::temp_dir();
dir.push(&format!("polars/{operation_name}/{uuid}"));
std::fs::create_dir_all(&dir)?;
let lockfile_path = get_lockfile_path(&dir);
let lockfile = Arc::new(LockFile::new(lockfile_path)?);
gc_thread(operation_name);
let (sender, receiver) = bounded::<Payload>(morsels_per_sink() * 2);
let sent: Arc<AtomicUsize> = Default::default();
let total: Arc<AtomicUsize> = Default::default();
let thread_local_count: Arc<AtomicUsize> = Default::default();
let dir2 = dir.clone();
let total2 = total.clone();
let lockfile2 = lockfile.clone();
let schema2 = schema.clone();
std::thread::spawn(move || {
let schema = schema2;
let _keep_hold_on_lockfile = lockfile2;
let mut count = 0usize;
while let Ok((partitions, iter)) = receiver.recv() {
if let Some(partitions) = partitions {
for (part, df) in partitions.into_no_null_iter().zip(iter) {
let mut path = dir2.clone();
path.push(format!("{part}"));
let _ = std::fs::create_dir(&path);
path.push(format!("{count}.ipc"));
let file = File::create(path).unwrap();
let writer = IpcWriter::new(file);
let mut writer = writer.batched(&schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
count += 1;
}
} else {
let mut path = dir2.clone();
path.push(format!("{count}.ipc"));
let file = File::create(path).unwrap();
let writer = IpcWriter::new(file);
let mut writer = writer.batched(&schema).unwrap();
for df in iter {
writer.write_batch(&df).unwrap();
}
writer.finish().unwrap();
count += 1;
}
total2.store(count, Ordering::Relaxed);
}
});
Ok(Self {
sender,
dir,
sent,
total,
_lockfile: lockfile,
thread_local_count,
schema,
})
}
pub(in crate::executors::sinks) fn dump_chunk(&self, mut df: DataFrame) {
if self.sender.is_full() {
let mut path = self.dir.clone();
let count = self.thread_local_count.fetch_add(1, Ordering::Relaxed);
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
let mut writer = IpcWriter::new(file);
writer.finish(&mut df).unwrap();
} else {
let iter = Box::new(std::iter::once(df));
self.dump_iter(None, iter)
}
}
pub(in crate::executors::sinks) fn dump_partition(&self, partition_no: IdxSize, df: DataFrame) {
let partition = Some(IdxCa::from_vec("", vec![partition_no]));
let iter = Box::new(std::iter::once(df));
self.dump_iter(partition, iter)
}
pub(in crate::executors::sinks) fn dump_partition_local(
&self,
partition_no: IdxSize,
df: DataFrame,
) {
let count = self.thread_local_count.fetch_add(1, Ordering::Relaxed);
let mut path = self.dir.clone();
path.push(format!("{partition_no}"));
let _ = std::fs::create_dir(&path);
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
let writer = IpcWriter::new(file);
let mut writer = writer.batched(&self.schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
}
pub(in crate::executors::sinks) fn dump_iter(&self, partition: Option<IdxCa>, iter: DfIter) {
let add = iter.size_hint().1.unwrap();
self.sender.send((partition, iter)).unwrap();
self.sent.fetch_add(add, Ordering::Relaxed);
}
}
impl Drop for IOThread {
fn drop(&mut self) {
std::fs::remove_file(&self._lockfile.path).unwrap();
}
}
pub(in crate::executors::sinks) fn block_thread_until_io_thread_done(io_thread: &IOThread) {
let sent = io_thread.sent.load(Ordering::Relaxed);
while io_thread.total.load(Ordering::Relaxed) != sent {
std::thread::park_timeout(Duration::from_millis(6))
}
}
struct LockFile {
path: PathBuf,
}
impl LockFile {
fn new(path: PathBuf) -> PolarsResult<Self> {
if File::create(&path).is_ok() {
Ok(Self { path })
} else {
polars_bail!(ComputeError: "could not create lockfile")
}
}
}
impl Drop for LockFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}