#include "file/line_file_reader.h"
#include <cstring>
#include "monitoring/iostats_context_imp.h"
namespace ROCKSDB_NAMESPACE {
IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg,
RateLimiter* rate_limiter) {
std::unique_ptr<FSSequentialFile> file;
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new LineFileReader(
std::move(file), fname, nullptr,
std::vector<std::shared_ptr<EventListener>>{}, rate_limiter));
}
return io_s;
}
bool LineFileReader::ReadLine(std::string* out,
Env::IOPriority rate_limiter_priority) {
assert(out);
if (!io_status_.ok()) {
io_status_.MustCheck();
return false;
}
out->clear();
for (;;) {
const char* found = static_cast<const char*>(
std::memchr(buf_begin_, '\n', buf_end_ - buf_begin_));
if (found) {
size_t len = found - buf_begin_;
out->append(buf_begin_, len);
buf_begin_ += len + 1;
++line_number_;
return true;
}
if (at_eof_) {
io_status_.MustCheck();
return false;
}
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
io_status_ =
sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority);
IOSTATS_ADD(bytes_read, result.size());
if (!io_status_.ok()) {
io_status_.MustCheck();
return false;
}
if (result.size() != buf_.size()) {
at_eof_ = true;
}
buf_begin_ = result.data();
buf_end_ = result.data() + result.size();
}
}
}