#pragma once
#include <iostream>
#include <string>
#include <sstream>
#include <functional>
#include <unordered_set>
#include <utility>
#include <thread>
#include <fcntl.h>
#include <cstdio>
#include <cstdlib>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <mio/mmap.hpp>
#include "sdsl/bit_vectors.hpp"
#include "ips4o.hpp"
#include "atomic_queue.h"
namespace mmmulti {
template <typename Value> class set {
struct ValueLess {
bool operator()(const Value &a, const Value &b) const { return a < b; }
};
mio::mmap_source reader;
std::string filename;
std::string index_filename;
bool sorted = false;
uint64_t n_records = 0;
bool indexed = false;
std::thread writer_thread;
atomic_queue::AtomicQueue2<Value, 2 << 16> value_queue;
std::atomic<bool> work_todo;
public:
class iterator;
class const_iterator;
set(void) = delete;
set(const set& m) = delete;
set(set&& m) = delete;
set& operator=(set&& m) = delete;
set(const std::string& f) : filename(f) {
work_todo.store(false);
}
~set(void) {
close_writer();
close_reader();
}
void set_base_filename(const std::string& f) {
filename = f;
}
void writer_func(void) {
std::ofstream writer(filename.c_str(), std::ios::binary | std::ios::trunc);
if (writer.fail()) {
throw std::ios_base::failure(std::strerror(errno));
}
Value value;
while (work_todo.load() || !value_queue.was_empty()) {
if (value_queue.try_pop(value)) {
do {
writer.write((char*)&value, sizeof(Value));
} while (value_queue.try_pop(value));
} else {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
}
writer.close();
}
void open_writer(void) {
if (!work_todo.load()) {
work_todo.store(true);
writer_thread = std::thread(&set::writer_func, this);
}
}
void close_writer(void) {
if (work_todo.load()) {
work_todo.store(false);
while (!value_queue.was_empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (writer_thread.joinable()) {
writer_thread.join();
}
}
}
void open_reader(void) {
if (reader.is_mapped()) return;
std::error_code error;
reader = mio::make_mmap_source(
filename.c_str(), 0, mio::map_entire_file, error);
if (error) { assert(false); }
madvise((void*)reader.begin(),
reader.mapped_length(),
POSIX_MADV_WILLNEED | POSIX_MADV_SEQUENTIAL);
}
void close_reader(void) {
if (reader.is_mapped()) reader.unmap();
}
void append(const Value& v) {
value_queue.push(v);
}
size_t size(void) const {
return n_records;
}
iterator begin(void) {
return iterator((Value*)reader.begin());
}
iterator end(void) {
return iterator((Value*)reader.end());
}
const_iterator begin(void) const {
return const_iterator((Value*)reader.begin());
}
const_iterator end(void) const {
return const_iterator((Value*)reader.end());
}
size_t get_record_size(void) const {
return sizeof(Value);
}
size_t record_count(void) {
return reader.size() / sizeof(Value);
}
std::ifstream::pos_type filesize(const char* filename) {
std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary);
return in.tellg();
}
void sort(int num_threads) {
if (sorted) return;
std::error_code error;
mio::mmap_sink buffer = mio::make_mmap_sink(
filename.c_str(), 0, mio::map_entire_file, error);
if (error) { assert(false); }
ips4o::parallel::sort((Value*)buffer.begin(),
(Value*)buffer.end(),
ValueLess(),
num_threads);
sorted = true;
}
Value read_value(size_t i) const {
Value v;
memcpy(&v, &reader[i*sizeof(Value)], sizeof(Value));
return v;
}
void index(int num_threads) {
close_writer();
sort(num_threads);
open_reader();
n_records = record_count();
indexed = true;
close_reader();
open_reader();
}
void for_each_value(const std::function<void(const Value&)>& lambda) const {
for (size_t i = 0; i < n_records; ++i) {
lambda(read_value(i));
}
}
void for_each_value_count(const std::function<void(const Value&, const uint64_t& count)>& lambda) const {
assert(sorted);
uint64_t curr_count = 0;
bool first = true;
Value last;
for_each_value([&](const Value& v) {
if (first) {
last = v;
first = false;
} else if (v != last) {
lambda(last, curr_count);
curr_count = 0;
last = v;
}
++curr_count;
});
lambda(last, curr_count);
}
void for_each_unique_value(const std::function<void(const Value&)>& lambda) const {
assert(sorted);
bool first = true;
Value last;
for_each_value([&](const Value& v) {
if (first) {
last = v;
first = false;
} else if (v != last) {
lambda(last);
last = v;
}
});
lambda(last);
}
class iterator {
public:
iterator(Value* ptr) : ptr(ptr) {}
iterator() : ptr(nullptr) {}
iterator(const set<Value>::iterator& other) : ptr(other.ptr) {}
iterator& operator=(const set<Value>::iterator& other) {
ptr = other.ptr;
}
Value& operator*() {
return *ptr;
}
bool operator==(const set<Value>::iterator& other) {
return ptr == other.ptr;
}
bool operator!=(const set<Value>::iterator& other) {
return !(*this == other);
}
iterator& operator++() {
++ptr;
return *this;
}
iterator operator++(int) {
return iterator(ptr++);
}
iterator& operator--() {
--ptr;
return *this;
}
iterator operator--(int) {
return iterator(ptr--);
}
private:
Value* ptr;
friend class const_iterator;
};
class const_iterator {
public:
const_iterator(const Value* ptr) : ptr(ptr) {}
const_iterator() : ptr(nullptr) {}
const_iterator(const set<Value>::const_iterator& other) : ptr(other.ptr) {}
const_iterator& operator=(const set<Value>::const_iterator& other) {
ptr = other.ptr;
}
const_iterator(const set<Value>::iterator& other) : ptr(other.ptr) {}
const_iterator& operator=(const set<Value>::iterator& other) {
ptr = other.ptr;
}
const Value& operator*() {
return *ptr;
}
bool operator==(const set<Value>::const_iterator& other) {
return ptr == other.ptr;
}
bool operator!=(const set<Value>::const_iterator& other) {
return !(*this == other);
}
const_iterator& operator++() {
++ptr;
return *this;
}
const_iterator operator++(int) {
return const_iterator(ptr++);
}
const_iterator& operator--() {
--ptr;
return *this;
}
const_iterator operator--(int) {
return const_iterator(ptr--);
}
private:
const Value* ptr;
};
};
}